fix: Speedtest not registering number of bytes read

This commit is contained in:
quexeky
2026-01-25 22:07:50 +11:00
parent 8c8e9ad4c9
commit bb3280cedf
10 changed files with 90 additions and 51 deletions
+44 -19
View File
@@ -1,15 +1,22 @@
use crate::{commands::{connect::{
config_option::{ConfigOption, ConfigOptionCli},
configurable::Configure,
s3::S3Config,
}, upload::speedtest::Speedtest}, manifest::DepotManifest};
use crate::{
commands::{
connect::{
config_option::{ConfigOption, ConfigOptionCli},
configurable::Configure,
s3::S3Config,
speedtest::{SPEEDTEST_PATH, Speedtest}
},
},
manifest::DepotManifest,
};
use dialoguer::{Confirm, theme::ColorfulTheme};
use futures::AsyncWriteExt;
use indicatif::{ProgressBar, ProgressStyle};
use log::{debug, info, warn};
use opendal::Operator;
use serde::{Deserialize, Serialize};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use std::{collections::HashMap, fs, ops::Not};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
const CONFIG_DIR: &str = "downpour/config.json";
@@ -41,7 +48,7 @@ impl Config {
let save_path = dirs::config_dir()
.expect("Apparently your home directory doesn't exist") // Should probably formalise that error
.join(CONFIG_DIR);
if fs::exists(&save_path).expect(&format!("Could not read save path {:#?}", &save_path)) {
if fs::exists(&save_path).unwrap_or_else(|_| panic!("Could not read save path {:#?}", &save_path)) {
serde_json::from_str(&fs::read_to_string(save_path).unwrap()).unwrap()
} else {
Config::new()
@@ -74,7 +81,6 @@ impl Config {
})
.next()
.cloned()
.map(|c| c.into())
} else {
None
}
@@ -89,7 +95,7 @@ pub async fn manage_configuration(
name: &String,
option: &ConfigOptionCli,
) -> anyhow::Result<()> {
if config.exists(&name) {
if config.exists(name) {
let confirm = Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(format!(
"An entry already exists with the name \"{}\". Would you like to overwrite it?",
@@ -106,31 +112,50 @@ pub async fn manage_configuration(
config.add_item(name.clone(), config_option.clone());
let operator = config_option.build()?;
generate_speedtest(&operator).await?;
generate_manifest(&operator).await?;
generate_speedtest(&operator).await?;
Ok(())
}
async fn generate_speedtest(operator: &Operator) -> anyhow::Result<()> {
if operator.exists("speedtest").await?.not() {
// Workaround to operator.exists("...") also logging a 404 warning
let lister = operator.list_with(SPEEDTEST_PATH).limit(1).await?;
if lister.is_empty().not() {
info!("Speedtest already exists on Depot. Skipping speedtest upload...");
return Ok(())
return Ok(());
}
let mut writer = operator.writer("speedtest").await?.into_futures_async_write().compat_write();
let mut reader = Speedtest::new();
let mut writer = operator
.writer(SPEEDTEST_PATH)
.await?
.into_futures_async_write()
.compat_write();
let progress_bar = ProgressBar::new(10_000).with_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%")
.unwrap(),
);
let mut reader = Speedtest::new(|progress| {
let progress_int = (progress * 100f32).round() as u64;
progress_bar.set_position(progress_int);
});
let written = tokio::io::copy(&mut reader, &mut writer).await?;
debug!("Wrote {} bytes to {:?}", written, operator.info());
writer.into_inner().close().await?;
Ok(())
}
async fn generate_manifest(operator: &Operator) -> anyhow::Result<()> {
info!("Manifest already exists on Depot. Skipping manifest upload...");
if operator.exists("manifest.json").await?.not() {
return Ok(())
let lister = operator.list_with("manifest.json").limit(1).await?;
if lister.is_empty().not() {
info!("Manifest already exists on Depot. Skipping manifest upload...");
return Ok(());
}
let data = DepotManifest::new();
operator.write("manifest.json", serde_json::to_string(&data)?).await?;
operator
.write("manifest.json", serde_json::to_string(&data)?)
.await?;
Ok(())
}
}
+3 -3
View File
@@ -8,7 +8,7 @@ macro_rules! interactive_variable {
let $var = if let Some($var) = $value.$var {
$var
} else {
crate::commands::connect::interactive::query_variable($prompt).unwrap()
$crate::commands::connect::interactive::query_variable($prompt).unwrap()
};
};
}
@@ -18,7 +18,7 @@ macro_rules! interactive_optional_variable {
let $var = if let Some($var) = $value.$var {
Some($var)
} else {
crate::commands::connect::interactive::query_optional_variable($prompt).unwrap()
$crate::commands::connect::interactive::query_optional_variable($prompt).unwrap()
};
};
}
@@ -40,7 +40,7 @@ where
.with_prompt(prompt.to_string())
.allow_empty(true)
.interact_text()?;
if input.to_string().len() == 0 {
if input.to_string().is_empty() {
return Ok(None);
}
Ok(Some(input))
+1
View File
@@ -4,3 +4,4 @@ pub mod s3;
#[macro_use]
pub mod interactive;
pub mod config_option;
pub mod speedtest;
+1 -1
View File
@@ -55,7 +55,7 @@ impl OperatorBuilder for S3Config {
.secret_access_key(&self.secret_key)
.region(&self.region)
.endpoint(&self.endpoint)
.root(self.root.as_ref().map(|s| s.as_str()).unwrap_or("/"))
.root(self.root.as_deref().unwrap_or("/"))
.bucket(&self.bucket_name)
.disable_config_load();
@@ -2,14 +2,15 @@ use rand::{RngCore, SeedableRng, rng, rngs::StdRng};
use tokio::io::AsyncRead;
#[derive(Clone, Debug)]
pub struct Speedtest {
pub struct Speedtest<F: Fn(f32)> {
core: rand::rngs::StdRng,
to_write: usize,
callback: Box<F>,
}
pub const SPEEDTEST_BYTES: usize = 64 * 1024 * 1024;
pub const SPEEDTEST_PATH: &str = "speedtest";
impl AsyncRead for Speedtest {
impl<F: Fn(f32)> AsyncRead for Speedtest<F> {
fn poll_read(
self: std::pin::Pin<&mut Self>,
_cx: &mut std::task::Context<'_>,
@@ -17,17 +18,24 @@ impl AsyncRead for Speedtest {
) -> std::task::Poll<std::io::Result<()>> {
let mut s = self;
let to_write = buf.remaining().min(s.to_write);
s.to_write = s.to_write.saturating_sub(to_write);
let fill_slice = buf.initialize_unfilled_to(to_write);
s.core.fill_bytes(fill_slice);
let filled = {
let fill_slice = buf.initialize_unfilled_to(to_write);
s.core.fill_bytes(fill_slice);
fill_slice.len()
};
s.to_write = s.to_write.saturating_sub(filled);
(s.callback)((1f32 - (s.to_write as f32 / SPEEDTEST_BYTES as f32)) * 100f32);
buf.advance(filled);
std::task::Poll::Ready(Ok(()))
}
}
impl Speedtest {
pub fn new() -> Self {
impl<F: Fn(f32)> Speedtest<F> {
pub fn new(callback: F) -> Self {
Self {
core: StdRng::from_rng(&mut rng()),
to_write: SPEEDTEST_BYTES,
callback: Box::new(callback),
}
}
}
+1 -2
View File
@@ -45,7 +45,7 @@ impl ChunkReader {
.collect::<Vec<LimitedFileReader>>()
.into_iter();
Self {
files: files,
files,
active: None,
}
}
@@ -65,7 +65,6 @@ impl AsyncRead for ChunkReader {
continue;
}
Ok(n) => {
buf.advance(n);
return Poll::Ready(Ok(()));
+18 -9
View File
@@ -1,10 +1,14 @@
use std::path::{Path, PathBuf};
use std::path::Path;
use crate::{
cli::UploadInfo,
commands::{connect::config::Config, upload::{chunk_reader::ChunkReader, uploadable::OperatorBuilder}},
commands::{
connect::config::Config,
upload::{chunk_reader::ChunkReader, uploadable::OperatorBuilder},
},
manifest::generate_manifest,
};
use futures::AsyncWriteExt;
use log::info;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
@@ -13,20 +17,25 @@ pub async fn upload(info: &UploadInfo, config: Config) -> anyhow::Result<()> {
let path = &info.path;
let version_id = &info.version_id;
let manifest = generate_manifest(&Path::new(path)).await?;
let manifest = generate_manifest(Path::new(path)).await?;
let operator = match info.upload_style {
crate::cli::UploadStyle::S3 => config
.get_active_s3()
.ok_or(anyhow::Error::msg("Could not get active S3 value"))?.build()?,
.get_active_s3()
.ok_or(anyhow::Error::msg("Could not get active S3 value"))?
.build()?,
};
info!("Uploading chunks");
for (id, data) in &manifest.chunks {
info!("Uploading chunk id {id}");
let mut reader = ChunkReader::new(&path, data);
let mut writer = operator.writer(&format!("{game_id}/{version_id}/{id}")).await?.into_futures_async_write().compat_write();
tokio::io::copy(&mut reader, &mut writer);
let mut reader = ChunkReader::new(path, data);
let mut writer = operator
.writer(&format!("{game_id}/{version_id}/{id}"))
.await?
.into_futures_async_write()
.compat_write();
tokio::io::copy(&mut reader, &mut writer).await?;
writer.into_inner().close().await?;
}
info!("Finished uploading chunks");
Ok(())
}
+2 -3
View File
@@ -1,4 +1,3 @@
pub mod interface;
pub mod speedtest;
pub mod uploadable;
pub mod chunk_reader;
pub mod interface;
pub mod uploadable;
-2
View File
@@ -1,7 +1,5 @@
use std::path::PathBuf;
use async_trait::async_trait;
use droplet_rs::manifest::{ChunkData, Manifest};
use opendal::Operator;
#[async_trait]
+5 -5
View File
@@ -25,7 +25,7 @@ impl DepotManifest {
content: HashMap::new(),
}
}
pub fn add(&mut self, game_id: String, version_id: String, compression: CompressionOption) {
pub fn append(&mut self, game_id: String, version_id: String, compression: CompressionOption) {
self.content.insert(
game_id,
DepotManifestGameData {
@@ -37,12 +37,13 @@ impl DepotManifest {
}
pub async fn generate_manifest(dir: &Path) -> anyhow::Result<Manifest> {
let progress_bar = ProgressBar::new(100_00).with_style(
let progress_bar = ProgressBar::new(10_000).with_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%")
.unwrap(),
);
let res = generate_manifest_rusty(
generate_manifest_rusty(
dir,
|progress| {
let progress_int = (progress * 100f32).round() as u64;
@@ -50,6 +51,5 @@ pub async fn generate_manifest(dir: &Path) -> anyhow::Result<Manifest> {
},
|log| progress_bar.println(log),
)
.await;
res
.await
}