chore: Migrate to using ReaderStream instead of ChunkReader
This commit is contained in:
@@ -1,106 +0,0 @@
|
||||
use droplet_rs::manifest::ChunkData;
|
||||
use std::{
|
||||
cmp::min,
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
path::Path,
|
||||
task::Poll,
|
||||
vec::IntoIter,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
pub struct ChunkReader {
|
||||
files: IntoIter<LimitedFileReader>,
|
||||
active: Option<LimitedFileReader>,
|
||||
}
|
||||
|
||||
pub struct LimitedFileReader {
|
||||
file: File,
|
||||
to_read_remaining: usize,
|
||||
}
|
||||
|
||||
impl Read for LimitedFileReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let to_read = min(self.to_read_remaining, buf.len());
|
||||
let (to_read, _remaining) = buf.split_at_mut(to_read);
|
||||
let read = self.file.read(to_read)?;
|
||||
self.to_read_remaining -= read;
|
||||
Ok(read)
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkReader {
|
||||
pub fn new(path: impl AsRef<Path>, chunk: &ChunkData) -> Self {
|
||||
let files = chunk
|
||||
.files
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let mut file = File::open(path.as_ref().join(&f.filename)).unwrap();
|
||||
file.seek(SeekFrom::Start(f.start as u64)).unwrap(); // TODO: Fix unwraps
|
||||
LimitedFileReader {
|
||||
file,
|
||||
to_read_remaining: f.length,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<LimitedFileReader>>()
|
||||
.into_iter();
|
||||
Self {
|
||||
files,
|
||||
active: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for ChunkReader {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let mut s = self;
|
||||
loop {
|
||||
if let Some(active) = &mut s.active {
|
||||
match active.read(buf.initialize_unfilled()) {
|
||||
Ok(0) => {
|
||||
s.active = None;
|
||||
continue;
|
||||
}
|
||||
Ok(n) => {
|
||||
buf.advance(n);
|
||||
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
}
|
||||
} else {
|
||||
if let Some(next) = s.files.next() {
|
||||
s.active = Some(next);
|
||||
} else {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// impl Read for ChunkReader {
|
||||
// fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
// loop {
|
||||
// if let Some(active) = &mut self.active {
|
||||
// match active.read(buf) {
|
||||
// Ok(0) => {
|
||||
// self.active = None;
|
||||
// continue;
|
||||
// }
|
||||
// Ok(n) => return Ok(n),
|
||||
// Err(e) => return Err(e),
|
||||
// }
|
||||
// } else {
|
||||
// if let Some(next) = self.files.next() {
|
||||
// self.active = Some(next);
|
||||
// } else {
|
||||
// return Ok(0);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
@@ -1,18 +1,24 @@
|
||||
use std::path::Path;
|
||||
use std::{io::SeekFrom, path::Path, pin::Pin};
|
||||
|
||||
use crate::{
|
||||
cli::UploadInfo,
|
||||
commands::{
|
||||
connect::{config::Config, config_option::ConfigOption},
|
||||
upload::chunk_reader::ChunkReader,
|
||||
},
|
||||
commands::connect::{config::Config, config_option::ConfigOption},
|
||||
manifest::{CompressionOption, DepotManifest, generate_v2_manifest},
|
||||
operator_builder::OperatorBuilder,
|
||||
};
|
||||
use futures::AsyncWriteExt;
|
||||
use droplet_rs::manifest::ChunkData;
|
||||
use futures::{AsyncWriteExt, StreamExt, TryStreamExt, future::join_all, stream};
|
||||
use log::info;
|
||||
use opendal::Operator;
|
||||
use tokio_util::compat::FuturesAsyncWriteCompatExt;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncRead, AsyncReadExt, AsyncSeekExt, Take},
|
||||
};
|
||||
use tokio_util::{
|
||||
bytes::Bytes,
|
||||
compat::FuturesAsyncWriteCompatExt,
|
||||
io::{ReaderStream, StreamReader},
|
||||
};
|
||||
|
||||
pub async fn upload(
|
||||
info: &UploadInfo,
|
||||
@@ -33,7 +39,7 @@ pub async fn upload(
|
||||
|
||||
for (id, data) in &v2_manifest.chunks {
|
||||
info!("Uploading chunk id {id}");
|
||||
let mut reader = ChunkReader::new(path, data);
|
||||
let mut reader = generate_chunk_readstream(path, data).await;
|
||||
let mut writer = operator
|
||||
.writer(&format!("{game_id}/{version_id}/{id}"))
|
||||
.await?
|
||||
@@ -53,6 +59,32 @@ pub async fn upload(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Black magic don't touch
|
||||
/// Connects all of the files at the correct start and end points into a single, continuous AsyncRead object
|
||||
pub async fn generate_chunk_readstream<'a, P: AsRef<Path> + 'a>(
|
||||
path: P,
|
||||
data: &'a ChunkData,
|
||||
) -> Pin<Box<impl AsyncRead>> {
|
||||
let path = path.as_ref().to_path_buf();
|
||||
let files = data.files.clone();
|
||||
|
||||
let stream = stream::iter(files)
|
||||
.map(move |f| {
|
||||
let path = path.clone();
|
||||
// Lazy block to ensure that not too many files get opened at once
|
||||
async move {
|
||||
let mut file = File::open(path.join(f.filename)).await?;
|
||||
file.seek(SeekFrom::Start(f.start as u64)).await?;
|
||||
tokio::io::Result::Ok(file.take(f.length as u64))
|
||||
}
|
||||
})
|
||||
.buffered(2) // Could also be 1. Just removes a bit of latency from opening files buy preparing the next one immediately
|
||||
.map_ok(|file| ReaderStream::new(file))
|
||||
.try_flatten();
|
||||
let reader = StreamReader::new(stream);
|
||||
Box::pin(reader)
|
||||
}
|
||||
|
||||
async fn get_depot_manifest(operator: &Operator) -> Result<DepotManifest, anyhow::Error> {
|
||||
let existing_depot_manifest = operator.read("manifest.json").await?.to_bytes();
|
||||
let existing_depot_manifest: DepotManifest =
|
||||
|
||||
@@ -1,2 +1 @@
|
||||
pub mod chunk_reader;
|
||||
pub mod interface;
|
||||
|
||||
Reference in New Issue
Block a user