feat: S3 chunk uploading
This commit is contained in:
@@ -0,0 +1,95 @@
|
||||
use droplet_rs::manifest::ChunkData;
|
||||
use std::{
|
||||
cmp::min,
|
||||
fs::File,
|
||||
io::{Read, Seek, SeekFrom},
|
||||
task::Poll, vec::IntoIter,
|
||||
};
|
||||
use tokio::io::AsyncRead;
|
||||
|
||||
pub struct ChunkReader {
|
||||
files: IntoIter<LimitedFileReader>,
|
||||
active: Option<LimitedFileReader>,
|
||||
}
|
||||
|
||||
pub struct LimitedFileReader {
|
||||
file: File,
|
||||
to_read_remaining: usize,
|
||||
}
|
||||
|
||||
impl Read for LimitedFileReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
let to_read = min(self.to_read_remaining, buf.len());
|
||||
let (to_read, _remaining) = buf.split_at_mut(to_read);
|
||||
let read = self.file.read(to_read)?;
|
||||
self.to_read_remaining -= read;
|
||||
Ok(read)
|
||||
}
|
||||
}
|
||||
|
||||
impl ChunkReader {
|
||||
pub fn new(chunk: &ChunkData) -> Self {
|
||||
let files = chunk.files.iter().map(|f| {
|
||||
let mut file = File::open(&f.filename).unwrap();
|
||||
file.seek(SeekFrom::Start(f.start as u64)).unwrap(); // TODO: Fix unwraps
|
||||
LimitedFileReader {
|
||||
file,
|
||||
to_read_remaining: f.length,
|
||||
}
|
||||
}).collect::<Vec<LimitedFileReader>>().into_iter();
|
||||
Self {
|
||||
files: files,
|
||||
active: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
impl AsyncRead for ChunkReader {
|
||||
fn poll_read(
|
||||
self: std::pin::Pin<&mut Self>,
|
||||
_cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> std::task::Poll<std::io::Result<()>> {
|
||||
let mut s = self;
|
||||
loop {
|
||||
if let Some(active) = &mut s.active {
|
||||
match active.read(buf.initialize_unfilled()) {
|
||||
Ok(0) => {
|
||||
s.active = None;
|
||||
continue;
|
||||
}
|
||||
Ok(_) => return Poll::Ready(Ok(())),
|
||||
Err(e) => return Poll::Ready(Err(e)),
|
||||
}
|
||||
} else {
|
||||
if let Some(next) = s.files.next() {
|
||||
s.active = Some(next);
|
||||
} else {
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for ChunkReader {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
loop {
|
||||
if let Some(active) = &mut self.active {
|
||||
match active.read(buf) {
|
||||
Ok(0) => {
|
||||
self.active = None;
|
||||
continue;
|
||||
}
|
||||
Ok(n) => return Ok(n),
|
||||
Err(e) => return Err(e),
|
||||
}
|
||||
} else {
|
||||
if let Some(next) = self.files.next() {
|
||||
self.active = Some(next);
|
||||
} else {
|
||||
return Ok(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -2,3 +2,4 @@ pub mod interface;
|
||||
pub mod s3;
|
||||
pub mod speedtest;
|
||||
pub mod uploadable;
|
||||
pub mod chunk_reader;
|
||||
|
||||
@@ -1,10 +1,6 @@
|
||||
use crate::{
|
||||
commands::configure::s3::S3Config,
|
||||
commands::upload::{
|
||||
speedtest::{SPEEDTEST_PATH, Speedtest},
|
||||
uploadable::Uploadable,
|
||||
},
|
||||
};
|
||||
use crate::commands::{configure::s3::S3Config, upload::{
|
||||
chunk_reader::ChunkReader, speedtest::{SPEEDTEST_PATH, Speedtest}, uploadable::Uploadable
|
||||
}};
|
||||
use async_trait::async_trait;
|
||||
use droplet_rs::manifest::{ChunkData, Manifest};
|
||||
use s3::Bucket;
|
||||
@@ -30,7 +26,10 @@ impl Uploadable for S3 {
|
||||
chunk_id: &String,
|
||||
chunk: &ChunkData,
|
||||
) -> anyhow::Result<()> {
|
||||
todo!()
|
||||
let path = &PathBuf::from(id).join(version).join(chunk_id).to_string_lossy().to_string();
|
||||
let mut reader = ChunkReader::new(chunk);
|
||||
self.put_object_stream(&mut reader, path).await?;
|
||||
Ok(())
|
||||
}
|
||||
async fn upload_speedtest(&mut self) -> anyhow::Result<()> {
|
||||
if self.object_exists(SPEEDTEST_PATH).await? {
|
||||
|
||||
Reference in New Issue
Block a user