fix: AsyncRead not advancing initialised buffer
This commit is contained in:
@@ -3,7 +3,9 @@ use std::{
|
||||
cmp::min,
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
task::Poll, vec::IntoIter,
|
||||
path::Path,
|
||||
task::Poll,
|
||||
vec::IntoIter,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
@@ -28,15 +30,20 @@ impl Read for LimitedFileReader {
|
||||
}
|
||||
|
||||
impl ChunkReader {
|
||||
pub fn new(chunk: &ChunkData) -> Self {
|
||||
let files = chunk.files.iter().map(|f| {
|
||||
let mut file = File::open(&f.filename).unwrap();
|
||||
file.seek(SeekFrom::Start(f.start as u64)).unwrap(); // TODO: Fix unwraps
|
||||
LimitedFileReader {
|
||||
file,
|
||||
to_read_remaining: f.length,
|
||||
}
|
||||
}).collect::<Vec<LimitedFileReader>>().into_iter();
|
||||
pub fn new(path: impl AsRef<Path>, chunk: &ChunkData) -> Self {
|
||||
let files = chunk
|
||||
.files
|
||||
.iter()
|
||||
.map(|f| {
|
||||
let mut file = File::open(path.as_ref().join(&f.filename)).unwrap();
|
||||
file.seek(SeekFrom::Start(f.start as u64)).unwrap(); // TODO: Fix unwraps
|
||||
LimitedFileReader {
|
||||
file,
|
||||
to_read_remaining: f.length,
|
||||
}
|
||||
})
|
||||
.collect::<Vec<LimitedFileReader>>()
|
||||
.into_iter();
|
||||
Self {
|
||||
files: files,
|
||||
active: None,
|
||||
@@ -57,7 +64,12 @@ impl AsyncRead for ChunkReader {
|
||||
s.active = None;
|
||||
continue;
|
||||
}
|
||||
Ok(_) => return Poll::Ready(Ok(())),
|
||||
Ok(n) => {
|
||||
|
||||
buf.advance(n);
|
||||
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
}
|
||||
} else {
|
||||
@@ -71,25 +83,25 @@ impl AsyncRead for ChunkReader {
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for ChunkReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
loop {
|
||||
if let Some(active) = &mut self.active {
|
||||
match active.read(buf) {
|
||||
Ok(0) => {
|
||||
self.active = None;
|
||||
continue;
|
||||
}
|
||||
Ok(n) => return Ok(n),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
} else {
|
||||
if let Some(next) = self.files.next() {
|
||||
self.active = Some(next);
|
||||
} else {
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// impl Read for ChunkReader {
|
||||
// fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
// loop {
|
||||
// if let Some(active) = &mut self.active {
|
||||
// match active.read(buf) {
|
||||
// Ok(0) => {
|
||||
// self.active = None;
|
||||
// continue;
|
||||
// }
|
||||
// Ok(n) => return Ok(n),
|
||||
// Err(e) => return Err(e),
|
||||
// }
|
||||
// } else {
|
||||
// if let Some(next) = self.files.next() {
|
||||
// self.active = Some(next);
|
||||
// } else {
|
||||
// return Ok(0);
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::Path;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use crate::{
|
||||
cli::UploadInfo,
|
||||
@@ -24,7 +24,7 @@ pub async fn upload(info: &UploadInfo, config: Config) -> anyhow::Result<()> {
|
||||
info!("Uploading chunks");
|
||||
for (id, data) in &manifest.chunks {
|
||||
info!("Uploading chunk id {id}");
|
||||
uploader.upload_chunk(game_id, version_id, id, data).await?;
|
||||
uploader.upload_chunk(PathBuf::from(path), game_id, version_id, id, data).await?;
|
||||
}
|
||||
info!("Finished uploading chunks");
|
||||
|
||||
|
||||
@@ -1,8 +1,15 @@
|
||||
use crate::commands::{configure::s3::S3Config, upload::{
|
||||
chunk_reader::ChunkReader, speedtest::{SPEEDTEST_PATH, Speedtest}, uploadable::Uploadable
|
||||
}};
|
||||
use crate::commands::{
|
||||
configure::s3::S3Config,
|
||||
upload::{
|
||||
chunk_reader::ChunkReader,
|
||||
speedtest::{SPEEDTEST_PATH, Speedtest},
|
||||
uploadable::Uploadable,
|
||||
},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use droplet_rs::manifest::{ChunkData, Manifest};
|
||||
use droplet_rs::{
|
||||
manifest::{ChunkData, Manifest},
|
||||
};
|
||||
use s3::Bucket;
|
||||
use serde_json::json;
|
||||
use std::{ops::Deref, path::PathBuf};
|
||||
@@ -21,20 +28,26 @@ impl S3 {
|
||||
impl Uploadable for S3 {
|
||||
async fn upload_chunk(
|
||||
&mut self,
|
||||
base_path: PathBuf,
|
||||
id: &String,
|
||||
version: &String,
|
||||
chunk_id: &String,
|
||||
chunk: &ChunkData,
|
||||
) -> anyhow::Result<()> {
|
||||
let path = &PathBuf::from(id).join(version).join(chunk_id).to_string_lossy().to_string();
|
||||
let mut reader = ChunkReader::new(chunk);
|
||||
self.put_object_stream(&mut reader, path).await?;
|
||||
let path = &PathBuf::from(id)
|
||||
.join(version)
|
||||
.join(chunk_id)
|
||||
.to_string_lossy()
|
||||
.to_string();
|
||||
let mut reader = ChunkReader::new(&base_path, chunk);
|
||||
self.put_object_stream(&mut reader, &path).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn upload_speedtest(&mut self) -> anyhow::Result<()> {
|
||||
if self.object_exists(SPEEDTEST_PATH).await? {
|
||||
return Ok(());
|
||||
}
|
||||
println!("Uploading speedtest");
|
||||
let mut speedtest = Speedtest::new();
|
||||
self.put_object_stream(&mut speedtest, SPEEDTEST_PATH)
|
||||
.await?;
|
||||
@@ -47,7 +60,7 @@ impl Uploadable for S3 {
|
||||
game_id: &String,
|
||||
version_id: &String,
|
||||
) -> anyhow::Result<()> {
|
||||
self.put_object(
|
||||
self.put_object_builder(
|
||||
PathBuf::from(game_id)
|
||||
.join(version_id)
|
||||
.join("manifest.json")
|
||||
@@ -55,6 +68,8 @@ impl Uploadable for S3 {
|
||||
.to_string(),
|
||||
json!(manifest).to_string().as_bytes(),
|
||||
)
|
||||
.with_content_type("application/json")
|
||||
.execute()
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
use std::path::PathBuf;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use droplet_rs::manifest::{ChunkData, Manifest};
|
||||
|
||||
@@ -5,6 +7,7 @@ use droplet_rs::manifest::{ChunkData, Manifest};
|
||||
pub trait Uploadable {
|
||||
async fn upload_chunk(
|
||||
&mut self,
|
||||
base_path: PathBuf,
|
||||
id: &String,
|
||||
version: &String,
|
||||
chunk_id: &String,
|
||||
|
||||
Reference in New Issue
Block a user