Use updated droplet-rs

(currently only local installation of droplet supported)
This commit is contained in:
quexeky
2026-01-29 08:52:21 +11:00
parent b7a429543a
commit 9077a30bee
5 changed files with 55 additions and 73 deletions
+4 -1
View File
@@ -100,8 +100,9 @@ pub async fn manage_configuration(
let operator = config_option.build()?;
generate_manifest(&operator).await?;
info!("Finished uploading manifest");
generate_speedtest(&operator).await?;
info!("Finished uploading speedtest");
Ok(())
}
@@ -129,8 +130,10 @@ async fn generate_speedtest(operator: &Operator) -> anyhow::Result<()> {
progress_bar.set_position(progress_int);
});
let written = tokio::io::copy(&mut reader, &mut writer).await?;
progress_bar.finish();
debug!("Wrote {} bytes to {:?}", written, operator.info());
writer.into_inner().close().await?;
debug!("Closed writer");
Ok(())
}
+21 -52
View File
@@ -1,4 +1,4 @@
use std::{io::SeekFrom, path::Path, pin::Pin};
use std::path::Path;
use crate::{
cli::UploadInfo,
@@ -6,19 +6,10 @@ use crate::{
manifest::{CompressionOption, DepotManifest, generate_v2_manifest},
operator_builder::OperatorBuilder,
};
use droplet_rs::manifest::ChunkData;
use futures::{AsyncWriteExt, StreamExt, TryStreamExt, future::join_all, stream};
use futures::AsyncWriteExt;
use log::info;
use opendal::Operator;
use tokio::{
fs::File,
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, Take},
};
use tokio_util::{
bytes::Bytes,
compat::FuturesAsyncWriteCompatExt,
io::{ReaderStream, StreamReader},
};
use opendal::{FuturesAsyncWriter, Operator};
use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
pub async fn upload(
info: &UploadInfo,
@@ -33,21 +24,25 @@ pub async fn upload(
let mut existing_depot_manifest = get_depot_manifest(&operator).await?;
let v2_manifest = generate_v2_manifest(Path::new(path)).await?;
info!("Uploading chunks");
for (id, data) in &v2_manifest.chunks {
info!("Uploading chunk id {id}");
let mut reader = generate_chunk_readstream(path, data).await;
let mut writer = operator
.writer(&format!("{game_id}/{version_id}/{id}"))
.await?
.into_futures_async_write()
.compat_write();
tokio::io::copy(&mut reader, &mut writer).await?;
writer.into_inner().close().await?;
}
let v2_manifest = generate_v2_manifest(
Path::new(path),
async |id: String| {
info!("Uploading chunk id {id}");
let writer = operator
.writer(&format!("{game_id}/{version_id}/{id}"))
.await
.unwrap()
.into_futures_async_write()
.compat_write();
writer
},
|writer: Compat<FuturesAsyncWriter>| async {
writer.into_inner().close().await.unwrap();
},
)
.await?;
info!("Finished uploading chunks");
@@ -59,32 +54,6 @@ pub async fn upload(
Ok(())
}
// Black magic don't touch
/// Connects all of the files at the correct start and end points into a single, continuous AsyncRead object
pub async fn generate_chunk_readstream<'a, P: AsRef<Path> + 'a>(
path: P,
data: &'a ChunkData,
) -> Pin<Box<impl AsyncRead>> {
let path = path.as_ref().to_path_buf();
let files = data.files.clone();
let stream = stream::iter(files)
.map(move |f| {
let path = path.clone();
// Lazy block to ensure that not too many files get opened at once
async move {
let mut file = File::open(path.join(f.filename)).await?;
file.seek(SeekFrom::Start(f.start as u64)).await?;
tokio::io::Result::Ok(file.take(f.length as u64))
}
})
.buffered(2) // Could also be 1. Just removes a bit of latency from opening files buy preparing the next one immediately
.map_ok(|file| ReaderStream::new(file))
.try_flatten();
let reader = StreamReader::new(stream);
Box::pin(reader)
}
async fn get_depot_manifest(operator: &Operator) -> Result<DepotManifest, anyhow::Error> {
let existing_depot_manifest = operator.read("manifest.json").await?.to_bytes();
let existing_depot_manifest: DepotManifest =
+13 -3
View File
@@ -1,9 +1,12 @@
use std::{collections::HashMap, path::Path};
use droplet_rs::manifest::{Manifest, generate_manifest_rusty};
use droplet_rs::manifest::{
Manifest, generate_manifest_rusty, generate_manifest_rusty_v2,
};
use indicatif::{ProgressBar, ProgressStyle};
use log::info;
use serde::{Deserialize, Serialize};
use tokio::io::AsyncWrite;
#[derive(Serialize, Deserialize)]
pub struct DepotManifest {
@@ -37,20 +40,27 @@ impl DepotManifest {
}
}
pub async fn generate_v2_manifest(dir: &Path) -> anyhow::Result<Manifest> {
pub async fn generate_v2_manifest<W, F, CloseF>(dir: &Path, factory: F, closer: CloseF) -> anyhow::Result<Manifest>
where
W: AsyncWrite + Unpin,
F: AsyncFn(String) -> W,
CloseF: AsyncFn(W)
{
let progress_bar = ProgressBar::new(10_000).with_style(
ProgressStyle::default_bar()
.template("[{elapsed_precise}] [ETA {eta}] {bar} {percent_precise}%")
.unwrap(),
);
generate_manifest_rusty(
generate_manifest_rusty_v2(
dir,
|progress| {
let progress_int = (progress * 100f32).round() as u64;
progress_bar.set_position(progress_int);
},
|log| progress_bar.suspend(|| info!("{}", log)),
factory,
closer
)
.await
}