diff --git a/libraries/droplet/Cargo.lock b/libraries/droplet/Cargo.lock index cf7b8dac..2a1665cf 100644 --- a/libraries/droplet/Cargo.lock +++ b/libraries/droplet/Cargo.lock @@ -249,7 +249,7 @@ dependencies = [ [[package]] name = "droplet-rs" -version = "0.16.2" +version = "0.16.3" dependencies = [ "anyhow", "async-trait", diff --git a/libraries/droplet/Cargo.toml b/libraries/droplet/Cargo.toml index 0417f24b..5db83ec1 100644 --- a/libraries/droplet/Cargo.toml +++ b/libraries/droplet/Cargo.toml @@ -2,7 +2,7 @@ edition = "2021" authors = ["Drop-OSS"] name = "droplet-rs" -version = "0.16.2" +version = "0.16.3" license = "AGPL-3.0-only" description = "Droplet is a `napi.rs` Rust/Node.js package full of high-performance and low-level utils for Drop" diff --git a/libraries/droplet/src/manifest.rs b/libraries/droplet/src/manifest.rs index f634be16..f4c80138 100644 --- a/libraries/droplet/src/manifest.rs +++ b/libraries/droplet/src/manifest.rs @@ -156,7 +156,7 @@ pub async fn generate_manifest_rusty( let manifest = manifest.clone(); let reader_semaphore = reader_semaphore.clone(); futures.spawn(async move { - let mut read_buf = vec![0; 1024 * 1024 * 64]; + let mut read_buf = vec![0u8; 1024 * 1024 * 8]; let uuid = uuid::Uuid::new_v4().to_string(); let mut hasher = Sha256::new(); @@ -189,9 +189,10 @@ pub async fn generate_manifest_rusty( } total += amount; hasher.update(&read_buf[0..amount]); - if total as u64 > length { - panic!("read too much: target {}, got {}", length, total); - } + } + + if total as u64 > length { + panic!("read too much: target {}, got {}", length, total); } chunk_length += length; diff --git a/libraries/droplet/src/versions/archive_backend.rs b/libraries/droplet/src/versions/archive_backend.rs index 75f84de6..2c0b4f22 100644 --- a/libraries/droplet/src/versions/archive_backend.rs +++ b/libraries/droplet/src/versions/archive_backend.rs @@ -28,27 +28,53 @@ impl ZipVersionBackend { } } -struct ArchiveReader { +struct ArchiveReader<'a> { archive: FileReader, + prev_block: Option<&'a [u8]>, } -impl AsyncRead for ArchiveReader { +impl<'a> AsyncRead for ArchiveReader<'a> { fn poll_read( - self: std::pin::Pin<&mut Self>, + mut self: std::pin::Pin<&mut Self>, _cx: &mut std::task::Context<'_>, buf: &mut tokio::io::ReadBuf<'_>, ) -> std::task::Poll> { + if let Some(block) = &mut self.prev_block { + let to_read = buf.remaining().min(block.len()); + let result = block.split_off(..to_read); + let result = result.unwrap(); // SAFETY: above .min statement + buf.put_slice(result); + + // If the block is empty, we can read more + if block.is_empty() { + self.prev_block = None; + } else { + return Poll::Ready(Ok(())); + } + } let block = match self.archive.read_block() { Ok(v) => v, Err(err) => return Poll::Ready(Err(std::io::Error::other(err.to_string()))), }; - let block = match block { + let mut block = match block { Some(v) => v, None => return Poll::Ready(Ok(())), }; - buf.put_slice(block); + let write_amount = buf.remaining().min(block.len()); + let to_write = block.split_off(..write_amount); + let to_write = to_write.unwrap(); // SAFETY: above .min statement + buf.put_slice(to_write); + + if !block.is_empty() { + #[cfg(debug_assertions)] + if self.prev_block.is_some() { + panic!("replacing prev_block while it contains data") + } + self.prev_block.replace(&block[buf.remaining()..]); + } + Poll::Ready(Ok(())) } } @@ -95,7 +121,10 @@ impl VersionBackend for ZipVersionBackend { } } - Ok(Box::new(ArchiveReader { archive })) + Ok(Box::new(ArchiveReader { + archive, + prev_block: None, + })) } async fn peek_file(&self, sub_path: String) -> anyhow::Result {