From b7a429543aeac0b8ae8726a3d0810658016f2c38 Mon Sep 17 00:00:00 2001 From: quexeky Date: Mon, 26 Jan 2026 16:09:20 +1100 Subject: [PATCH] chore: Migrate to using ReaderStream instead of ChunkReader --- cli/src/commands/upload/chunk_reader.rs | 106 ------------------------ cli/src/commands/upload/interface.rs | 48 +++++++++-- cli/src/commands/upload/mod.rs | 1 - 3 files changed, 40 insertions(+), 115 deletions(-) delete mode 100644 cli/src/commands/upload/chunk_reader.rs diff --git a/cli/src/commands/upload/chunk_reader.rs b/cli/src/commands/upload/chunk_reader.rs deleted file mode 100644 index e51dc566..00000000 --- a/cli/src/commands/upload/chunk_reader.rs +++ /dev/null @@ -1,106 +0,0 @@ -use droplet_rs::manifest::ChunkData; -use std::{ - cmp::min, - fs::File, - io::{Read, Seek, SeekFrom}, - path::Path, - task::Poll, - vec::IntoIter, -}; -use tokio::io::AsyncRead; - -pub struct ChunkReader { - files: IntoIter, - active: Option, -} - -pub struct LimitedFileReader { - file: File, - to_read_remaining: usize, -} - -impl Read for LimitedFileReader { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - let to_read = min(self.to_read_remaining, buf.len()); - let (to_read, _remaining) = buf.split_at_mut(to_read); - let read = self.file.read(to_read)?; - self.to_read_remaining -= read; - Ok(read) - } -} - -impl ChunkReader { - pub fn new(path: impl AsRef, chunk: &ChunkData) -> Self { - let files = chunk - .files - .iter() - .map(|f| { - let mut file = File::open(path.as_ref().join(&f.filename)).unwrap(); - file.seek(SeekFrom::Start(f.start as u64)).unwrap(); // TODO: Fix unwraps - LimitedFileReader { - file, - to_read_remaining: f.length, - } - }) - .collect::>() - .into_iter(); - Self { - files, - active: None, - } - } -} -impl AsyncRead for ChunkReader { - fn poll_read( - self: std::pin::Pin<&mut Self>, - _cx: &mut std::task::Context<'_>, - buf: &mut tokio::io::ReadBuf<'_>, - ) -> std::task::Poll> { - let mut s = self; - loop { - if let Some(active) = &mut s.active { - match active.read(buf.initialize_unfilled()) { - Ok(0) => { - s.active = None; - continue; - } - Ok(n) => { - buf.advance(n); - - return Poll::Ready(Ok(())); - } - Err(e) => return Poll::Ready(Err(e)), - } - } else { - if let Some(next) = s.files.next() { - s.active = Some(next); - } else { - return Poll::Ready(Ok(())); - } - } - } - } -} - -// impl Read for ChunkReader { -// fn read(&mut self, buf: &mut [u8]) -> std::io::Result { -// loop { -// if let Some(active) = &mut self.active { -// match active.read(buf) { -// Ok(0) => { -// self.active = None; -// continue; -// } -// Ok(n) => return Ok(n), -// Err(e) => return Err(e), -// } -// } else { -// if let Some(next) = self.files.next() { -// self.active = Some(next); -// } else { -// return Ok(0); -// } -// } -// } -// } -// } diff --git a/cli/src/commands/upload/interface.rs b/cli/src/commands/upload/interface.rs index ca2c97ff..bb74fee9 100644 --- a/cli/src/commands/upload/interface.rs +++ b/cli/src/commands/upload/interface.rs @@ -1,18 +1,24 @@ -use std::path::Path; +use std::{io::SeekFrom, path::Path, pin::Pin}; use crate::{ cli::UploadInfo, - commands::{ - connect::{config::Config, config_option::ConfigOption}, - upload::chunk_reader::ChunkReader, - }, + commands::connect::{config::Config, config_option::ConfigOption}, manifest::{CompressionOption, DepotManifest, generate_v2_manifest}, operator_builder::OperatorBuilder, }; -use futures::AsyncWriteExt; +use droplet_rs::manifest::ChunkData; +use futures::{AsyncWriteExt, StreamExt, TryStreamExt, future::join_all, stream}; use log::info; use opendal::Operator; -use tokio_util::compat::FuturesAsyncWriteCompatExt; +use tokio::{ + fs::File, + io::{AsyncRead, AsyncReadExt, AsyncSeekExt, Take}, +}; +use tokio_util::{ + bytes::Bytes, + compat::FuturesAsyncWriteCompatExt, + io::{ReaderStream, StreamReader}, +}; pub async fn upload( info: &UploadInfo, @@ -33,7 +39,7 @@ pub async fn upload( for (id, data) in &v2_manifest.chunks { info!("Uploading chunk id {id}"); - let mut reader = ChunkReader::new(path, data); + let mut reader = generate_chunk_readstream(path, data).await; let mut writer = operator .writer(&format!("{game_id}/{version_id}/{id}")) .await? @@ -53,6 +59,32 @@ pub async fn upload( Ok(()) } +// Black magic don't touch +/// Connects all of the files at the correct start and end points into a single, continuous AsyncRead object +pub async fn generate_chunk_readstream<'a, P: AsRef + 'a>( + path: P, + data: &'a ChunkData, +) -> Pin> { + let path = path.as_ref().to_path_buf(); + let files = data.files.clone(); + + let stream = stream::iter(files) + .map(move |f| { + let path = path.clone(); + // Lazy block to ensure that not too many files get opened at once + async move { + let mut file = File::open(path.join(f.filename)).await?; + file.seek(SeekFrom::Start(f.start as u64)).await?; + tokio::io::Result::Ok(file.take(f.length as u64)) + } + }) + .buffered(2) // Could also be 1. Just removes a bit of latency from opening files buy preparing the next one immediately + .map_ok(|file| ReaderStream::new(file)) + .try_flatten(); + let reader = StreamReader::new(stream); + Box::pin(reader) +} + async fn get_depot_manifest(operator: &Operator) -> Result { let existing_depot_manifest = operator.read("manifest.json").await?.to_bytes(); let existing_depot_manifest: DepotManifest = diff --git a/cli/src/commands/upload/mod.rs b/cli/src/commands/upload/mod.rs index cdaa9e81..8d3d626b 100644 --- a/cli/src/commands/upload/mod.rs +++ b/cli/src/commands/upload/mod.rs @@ -1,2 +1 @@ -pub mod chunk_reader; pub mod interface;