Depot API & executor launch (#173)

* feat: depot api downloads

* feat: frontend fixes and experimental webview store

* feat: sync downloader

* feat: cleanup and fixes

* feat: encrypted database and fixed resuming

* feat: launch option selector

* fix: autostart when no options

* fix: clippy

* fix: clippy x2

* feat: executor launch

* feat: executor launch

* feat: not installed error handling

* feat: better offline handling

* feat: dependency popup

* fix: cancelation and resuming issues

* feat: dedup by platform

* feat: new ui for additional components and fix dl manager clog

* feat: auto-queue dependencies

* feat: depot scanning and ranking

* feat: new library fetching stack

* In-app store page (Windows + macOS) (#176)

* feat: async store loading

* feat: fix overscroll behaviour

* fix: query params in server protocol

* fix: clippy
This commit is contained in:
DecDuck
2026-01-20 00:40:48 +00:00
committed by GitHub
parent 55fdaf51e1
commit fc69ae30ab
72 changed files with 3430 additions and 2732 deletions
@@ -12,13 +12,13 @@ pub struct Collection {
name: String,
is_default: bool,
user_id: String,
entries: Vec<CollectionObject>,
pub entries: Vec<CollectionObject>,
}
#[derive(Serialize, Deserialize, Debug, Clone, Default, Encode, Decode)]
#[serde(rename_all = "camelCase")]
pub struct CollectionObject {
collection_id: String,
game_id: String,
game: Game,
pub collection_id: String,
pub game_id: String,
pub game: Game,
}
@@ -1,7 +1,8 @@
use async_trait::async_trait;
use database::{
ApplicationTransientStatus, DownloadType, DownloadableMetadata, borrow_db_checked,
borrow_db_mut_checked,
ApplicationTransientStatus, DownloadableMetadata, borrow_db_checked, borrow_db_mut_checked,
};
use download_manager::depot_manager::DepotManager;
use download_manager::download_manager_frontend::{DownloadManagerSignal, DownloadStatus};
use download_manager::downloadable::Downloadable;
use download_manager::error::ApplicationDownloadError;
@@ -9,60 +10,53 @@ use download_manager::util::download_thread_control_flag::{
DownloadThreadControl, DownloadThreadControlFlag,
};
use download_manager::util::progress_object::{ProgressHandle, ProgressObject};
use droplet_rs::manifest::Manifest;
use log::{debug, error, info, warn};
use rayon::ThreadPoolBuilder;
use remote::auth::generate_authorization_header;
use remote::error::RemoteAccessError;
use remote::requests::generate_url;
use remote::utils::{DROP_CLIENT_ASYNC, DROP_CLIENT_SYNC};
use std::collections::{HashMap, HashSet};
use std::fs::{OpenOptions, create_dir_all};
use std::io;
use remote::utils::DROP_CLIENT_ASYNC;
use std::fmt::Debug;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::mpsc::Sender;
use std::sync::{Arc, Mutex};
use std::time::Instant;
use tauri::AppHandle;
use tokio::sync::mpsc::Sender;
use utils::{app_emit, lock, send};
#[cfg(target_os = "linux")]
use rustix::fs::{FallocateFlags, fallocate};
use crate::downloads::manifest::{
DownloadBucket, DownloadContext, DownloadDrop, DropManifest, DropValidateContext, ManifestBody,
};
use crate::downloads::utils::get_disk_available;
use crate::downloads::validate::validate_game_chunk;
use crate::library::{on_game_complete, push_game_update, set_partially_installed};
use crate::state::GameStatusManager;
use super::download_logic::download_game_bucket;
use super::download_logic::download_game_chunk;
use super::drop_data::DropData;
static RETRY_COUNT: usize = 3;
const TARGET_BUCKET_SIZE: usize = 63 * 1000 * 1000;
const MAX_FILES_PER_BUCKET: usize = (1024 / 4) - 1;
pub struct GameDownloadAgent {
pub id: String,
pub version: String,
pub metadata: DownloadableMetadata,
pub control_flag: DownloadThreadControl,
buckets: Mutex<Vec<DownloadBucket>>,
context_map: Mutex<HashMap<String, bool>>,
pub manifest: Mutex<Option<DropManifest>>,
pub manifest: Mutex<Option<Manifest>>,
pub progress: Arc<ProgressObject>,
depot_manager: Arc<DepotManager>,
sender: Sender<DownloadManagerSignal>,
pub dropdata: DropData,
status: Mutex<DownloadStatus>,
}
impl Debug for GameDownloadAgent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("GameDownloadAgent").finish()
}
}
impl GameDownloadAgent {
pub async fn new_from_index(
id: String,
version: String,
metadata: DownloadableMetadata,
target_download_dir: usize,
sender: Sender<DownloadManagerSignal>,
depot_manager: Arc<DepotManager>,
) -> Result<Self, ApplicationDownloadError> {
let base_dir = {
let db_lock = borrow_db_checked();
@@ -70,53 +64,43 @@ impl GameDownloadAgent {
db_lock.applications.install_dirs[target_download_dir].clone()
};
Self::new(id, version, base_dir, sender).await
Self::new(metadata, base_dir, sender, depot_manager).await
}
pub async fn new(
id: String,
version: String,
metadata: DownloadableMetadata,
base_dir: PathBuf,
sender: Sender<DownloadManagerSignal>,
depot_manager: Arc<DepotManager>,
) -> Result<Self, ApplicationDownloadError> {
// Don't run by default
let control_flag = DownloadThreadControl::new(DownloadThreadControlFlag::Stop);
let base_dir_path = Path::new(&base_dir);
let data_base_dir_path = base_dir_path.join(id.clone());
let stored_manifest =
DropData::generate(id.clone(), version.clone(), data_base_dir_path.clone());
let context_lock = stored_manifest.contexts.lock().unwrap().clone();
info!("base dir {}", base_dir_path.display());
let data_base_dir_path = base_dir_path.join(metadata.id.clone());
info!("data dir path {}", data_base_dir_path.display());
let stored_manifest = DropData::generate(
metadata.id.clone(),
metadata.version.clone(),
metadata.target_platform,
data_base_dir_path.clone(),
);
let result = Self {
id,
version,
metadata,
control_flag,
manifest: Mutex::new(None),
buckets: Mutex::new(Vec::new()),
context_map: Mutex::new(HashMap::new()),
progress: Arc::new(ProgressObject::new(0, 0, sender.clone())),
sender,
dropdata: stored_manifest,
status: Mutex::new(DownloadStatus::Queued),
depot_manager,
};
result.ensure_manifest_exists().await?;
let required_space = lock!(result.manifest)
.as_ref()
.unwrap()
.values()
.map(|e| {
e.lengths
.iter()
.enumerate()
.filter(|(i, _)| *context_lock.get(&e.checksums[*i]).unwrap_or(&false))
.map(|(_, v)| v)
.sum::<usize>()
})
.sum::<usize>() as u64;
let required_space = lock!(result.manifest).as_ref().unwrap().size;
let available_space = get_disk_available(data_base_dir_path)? as u64;
@@ -134,7 +118,7 @@ impl GameDownloadAgent {
pub fn setup_download(&self, app_handle: &AppHandle) -> Result<(), ApplicationDownloadError> {
let mut db_lock = borrow_db_mut_checked();
let status = ApplicationTransientStatus::Downloading {
version_name: self.version.clone(),
version_id: self.metadata.version.clone(),
};
db_lock
.applications
@@ -147,25 +131,26 @@ impl GameDownloadAgent {
return Err(ApplicationDownloadError::NotInitialized);
}
self.ensure_buckets()?;
self.control_flag.set(DownloadThreadControlFlag::Go);
Ok(())
}
// Blocking
pub fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
pub async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
self.setup_download(app_handle)?;
let timer = Instant::now();
info!("beginning download for {}...", self.metadata().id);
let res = self.run().map_err(ApplicationDownloadError::Communication);
let res = self
.run()
.await
.map_err(ApplicationDownloadError::Communication);
debug!(
"{} took {}ms to download",
self.id,
self.metadata.id,
timer.elapsed().as_millis()
);
res
@@ -187,7 +172,10 @@ impl GameDownloadAgent {
let client = DROP_CLIENT_ASYNC.clone();
let url = generate_url(
&["/api/v1/client/game/manifest"],
&[("id", &self.id), ("version", &self.version)],
&[
("id", &self.metadata.id),
("version", &self.metadata.version),
],
)
.map_err(ApplicationDownloadError::Communication)?;
@@ -207,7 +195,7 @@ impl GameDownloadAgent {
));
}
let manifest_download: DropManifest = response
let manifest_download: Manifest = response
.json()
.await
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
@@ -222,332 +210,159 @@ impl GameDownloadAgent {
// Sets it up for both download and validate
fn setup_progress(&self) {
let buckets = lock!(self.buckets);
let manifest = lock!(self.manifest);
let manifest = manifest.as_ref().unwrap();
let chunk_count = buckets.iter().map(|e| e.drops.len()).sum();
let total_length = buckets
.iter()
.map(|bucket| bucket.drops.iter().map(|e| e.length).sum::<usize>())
.sum();
self.progress.set_max(total_length);
self.progress.set_size(chunk_count);
self.progress.set_max(manifest.size.try_into().unwrap());
self.progress.set_size(manifest.chunks.len());
self.progress.reset();
}
pub fn ensure_buckets(&self) -> Result<(), ApplicationDownloadError> {
if lock!(self.buckets).is_empty() {
self.generate_buckets()?;
}
*lock!(self.context_map) = self.dropdata.get_contexts();
Ok(())
}
pub fn generate_buckets(&self) -> Result<(), ApplicationDownloadError> {
let manifest = lock!(self.manifest)
.clone()
.ok_or(ApplicationDownloadError::NotInitialized)?;
let game_id = self.id.clone();
let base_path = Path::new(&self.dropdata.base_path);
create_dir_all(base_path)?;
let mut buckets = Vec::new();
let mut current_buckets = HashMap::<String, DownloadBucket>::new();
let mut current_bucket_sizes = HashMap::<String, usize>::new();
for (raw_path, chunk) in manifest {
let path = base_path.join(Path::new(&raw_path));
let container = path
.parent()
.ok_or(ApplicationDownloadError::IoError(Arc::new(io::Error::new(
io::ErrorKind::NotFound,
"no parent directory",
))))?;
create_dir_all(container)?;
let already_exists = path.exists();
let file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.truncate(false)
.open(&path)?;
let mut file_running_offset = 0;
for (index, length) in chunk.lengths.iter().enumerate() {
let drop = DownloadDrop {
filename: raw_path.to_string(),
start: file_running_offset,
length: *length,
checksum: chunk.checksums[index].clone(),
permissions: chunk.permissions,
path: path.clone(),
index,
};
file_running_offset += *length;
if *length >= TARGET_BUCKET_SIZE {
// They get their own bucket
buckets.push(DownloadBucket {
game_id: game_id.clone(),
version: chunk.version_name.clone(),
drops: vec![drop],
});
continue;
}
let current_bucket_size = current_bucket_sizes
.entry(chunk.version_name.clone())
.or_insert_with(|| 0);
let c_version_name = chunk.version_name.clone();
let c_game_id = game_id.clone();
let current_bucket = current_buckets
.entry(chunk.version_name.clone())
.or_insert_with(|| DownloadBucket {
game_id: c_game_id,
version: c_version_name,
drops: vec![],
});
if (*current_bucket_size + length >= TARGET_BUCKET_SIZE
|| current_bucket.drops.len() >= MAX_FILES_PER_BUCKET)
&& !current_bucket.drops.is_empty()
{
// Move current bucket into list and make a new one
buckets.push(current_bucket.clone());
*current_bucket = DownloadBucket {
game_id: game_id.clone(),
version: chunk.version_name.clone(),
drops: vec![],
};
*current_bucket_size = 0;
}
current_bucket.drops.push(drop);
*current_bucket_size += *length;
}
#[cfg(target_os = "linux")]
if file_running_offset > 0 && !already_exists {
let _ = fallocate(file, FallocateFlags::empty(), 0, file_running_offset as u64);
}
}
for (_, bucket) in current_buckets.into_iter() {
if !bucket.drops.is_empty() {
buckets.push(bucket);
}
}
info!("buckets: {}", buckets.len());
let existing_contexts = self.dropdata.get_contexts();
self.dropdata.set_contexts(
&buckets
.iter()
.flat_map(|x| x.drops.iter().map(|v| v.checksum.clone()))
.map(|x| {
let contains = existing_contexts.get(&x).unwrap_or(&false);
(x, *contains)
})
.collect::<Vec<(String, bool)>>(),
);
*lock!(self.buckets) = buckets;
Ok(())
}
fn run(&self) -> Result<bool, RemoteAccessError> {
async fn run(&self) -> Result<bool, RemoteAccessError> {
self.depot_manager.sync_depots().await?;
self.setup_progress();
let (chunks, key) = {
let manifest = lock!(self.manifest);
let manifest = manifest.as_ref().unwrap();
(manifest.chunks.clone(), manifest.key)
};
let chunk_len = chunks.len();
let mut completed_chunks = {
let completed_chunks = lock!(self.dropdata.contexts);
completed_chunks.clone()
};
let max_download_threads = borrow_db_checked().settings.max_download_threads;
debug!(
"downloading game: {} with {} threads",
self.id, max_download_threads
);
let pool = ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
.unwrap_or_else(|_| {
panic!("failed to build thread pool with {max_download_threads} threads")
});
let (sender, recv) = crossbeam_channel::bounded(16);
let buckets = lock!(self.buckets);
let unsafe_self: &'static GameDownloadAgent = unsafe { mem::transmute(self) };
let local_completed_chunks = completed_chunks.clone();
let mut download_contexts = HashMap::<String, DownloadContext>::new();
let download_join_handle = tauri::async_runtime::spawn_blocking(move || {
let thread_pool = rayon::ThreadPoolBuilder::new()
.num_threads(max_download_threads)
.build()
.unwrap();
thread_pool.scope(move |s| {
for (index, (chunk_id, chunk_data)) in chunks.into_iter().enumerate() {
let local_sender = sender.clone();
let progress = unsafe_self.progress.get(index);
let progress_handle =
ProgressHandle::new(progress, unsafe_self.progress.clone());
let versions = buckets
.iter()
.map(|e| &e.version)
.collect::<HashSet<_>>()
.into_iter()
.cloned()
.collect::<Vec<String>>();
let chunk_length = chunk_data.files.iter().map(|v| v.length).sum();
info!("downloading across these versions: {versions:?}");
if *local_completed_chunks.get(&chunk_id).unwrap_or(&false) {
progress_handle.skip(chunk_length);
continue;
}
let completed_contexts = Arc::new(boxcar::Vec::new());
let completed_indexes_loop_arc = completed_contexts.clone();
for version in versions {
let download_context = DROP_CLIENT_SYNC
.post(generate_url(&["/api/v2/client/context"], &[])?)
.json(&ManifestBody {
game: self.id.clone(),
version: version.clone(),
})
.header("Authorization", generate_authorization_header())
.send()?;
if download_context.status() != 200 {
return Err(RemoteAccessError::InvalidResponse(download_context.json()?));
}
let download_context = download_context.json::<DownloadContext>()?;
info!(
"download context: ({}) {}",
&version, download_context.context
);
download_contexts.insert(version, download_context);
}
let download_contexts = &download_contexts;
pool.scope(|scope| {
let context_map = lock!(self.context_map);
for (index, bucket) in buckets.iter().enumerate() {
let mut bucket = (*bucket).clone();
let completed_contexts = completed_indexes_loop_arc.clone();
let progress = self.progress.get(index);
let progress_handle = ProgressHandle::new(progress, self.progress.clone());
// If we've done this one already, skip it
// Note to future DecDuck, DropData gets loaded into context_map
let todo_drops = bucket
.drops
.into_iter()
.filter(|e| {
let todo = !*context_map.get(&e.checksum).unwrap_or(&false);
if !todo {
progress_handle.skip(e.length);
let sender = unsafe_self.sender.clone();
let (depot, permit) = match unsafe_self
.depot_manager
.next_depot(&unsafe_self.metadata.id, &unsafe_self.metadata.version)
{
Ok(v) => v,
Err(err) => {
tauri::async_runtime::spawn(async move {
send!(sender, DownloadManagerSignal::Error(ApplicationDownloadError::Communication(err)));
});
return;
}
todo
})
.collect::<Vec<DownloadDrop>>();
};
if todo_drops.is_empty() {
continue;
};
bucket.drops = todo_drops;
let sender = self.sender.clone();
let download_context =
download_contexts.get(&bucket.version).unwrap_or_else(|| {
panic!(
"Could not get bucket version {}. Corrupted state.",
bucket.version
)
});
scope.spawn(move |_| {
// 3 attempts
for i in 0..RETRY_COUNT {
let loop_progress_handle = progress_handle.clone();
match download_game_bucket(
&bucket,
download_context,
&self.control_flag,
loop_progress_handle,
) {
Ok(true) => {
for drop in bucket.drops {
completed_contexts.push(drop.checksum);
}
return;
}
Ok(false) => return,
Err(e) => {
warn!("game download agent error: {e}");
let retry = matches!(
&e,
ApplicationDownloadError::Communication(_)
| ApplicationDownloadError::Checksum
| ApplicationDownloadError::Lock
| ApplicationDownloadError::IoError(_)
);
if i == RETRY_COUNT - 1 || !retry {
warn!("retry logic failed, not re-attempting.");
send!(sender, DownloadManagerSignal::Error(e));
s.spawn(move |_| {
for i in 0..RETRY_COUNT {
let loop_progress_handle = progress_handle.clone();
let base_path = unsafe_self.dropdata.base_path.clone();
match download_game_chunk(
&unsafe_self.metadata.id,
&unsafe_self.metadata.version,
&chunk_id,
&depot,
&key,
&chunk_data,
base_path,
&unsafe_self.control_flag,
loop_progress_handle,
) {
Ok(true) => {
local_sender.send(chunk_id.clone()).unwrap();
drop(permit); // Take ownership
return;
}
Ok(false) => return,
Err(e) => {
warn!("got error for chunk id {}: {e:?}", chunk_id);
let retry = true; /*matches!(
&e,
ApplicationDownloadError::Communication(_)
| ApplicationDownloadError::Checksum
| ApplicationDownloadError::Lock
| ApplicationDownloadError::IoError(_)
);*/
if i == RETRY_COUNT - 1 || !retry {
warn!("retry logic failed, not re-attempting.");
tauri::async_runtime::spawn(async move {
send!(sender, DownloadManagerSignal::Error(e));
});
return;
}
}
}
}
}
});
}
});
}
drop(sender);
});
});
let newly_completed = completed_contexts.clone();
let mut outputs = Vec::new();
while let Ok(chunk_id) = recv.recv() {
outputs.push(chunk_id);
}
let completed_lock_len = {
let mut context_map_lock = lock!(self.context_map);
for (_, item) in newly_completed.iter() {
context_map_lock.insert(item.clone(), true);
}
download_join_handle
.await
.expect("failed to complete download");
context_map_lock.values().filter(|x| **x).count()
};
for completed_chunk in outputs {
completed_chunks.insert(completed_chunk, true);
}
let context_map_lock = lock!(self.context_map);
let contexts = buckets
let drop_data_chunks = completed_chunks
.iter()
.flat_map(|x| x.drops.iter().map(|e| e.checksum.clone()))
.map(|x| {
let completed = context_map_lock.get(&x).unwrap_or(&false);
(x, *completed)
})
.map(|v| (v.0.to_string(), *v.1))
.collect::<Vec<(String, bool)>>();
drop(context_map_lock);
self.dropdata.set_contexts(&contexts);
self.dropdata.set_contexts(&drop_data_chunks);
self.dropdata.write();
info!("completed {} chunks", drop_data_chunks.len());
// If there are any contexts left which are false
if !contexts.iter().all(|x| x.1) {
if completed_chunks.len() != chunk_len {
info!(
"download agent for {} exited without completing ({}/{}) ({} buckets)",
self.id.clone(),
completed_lock_len,
contexts.len(),
buckets.len()
"download agent for {} exited without completing ({}/{})",
self.metadata.id.clone(),
completed_chunks.len(),
chunk_len,
);
return Ok(false);
}
Ok(true)
}
#[allow(dead_code)]
fn setup_validate(&self, app_handle: &AppHandle) {
self.setup_progress();
self.control_flag.set(DownloadThreadControlFlag::Go);
let status = ApplicationTransientStatus::Validating {
version_name: self.version.clone(),
version_id: self.metadata.version.clone(),
};
let mut db_lock = borrow_db_mut_checked();
@@ -558,7 +373,8 @@ impl GameDownloadAgent {
push_game_update(app_handle, &self.metadata().id, None, (None, Some(status)));
}
pub fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
pub fn validate(&self, _app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
/*
self.setup_validate(app_handle);
let buckets = lock!(self.buckets);
@@ -612,6 +428,7 @@ impl GameDownloadAgent {
return Ok(false);
}
*/
Ok(true)
}
@@ -628,10 +445,11 @@ impl GameDownloadAgent {
}
}
#[async_trait]
impl Downloadable for GameDownloadAgent {
fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
async fn download(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
*lock!(self.status) = DownloadStatus::Downloading;
self.download(app_handle)
self.download(app_handle).await
}
fn validate(&self, app_handle: &AppHandle) -> Result<bool, ApplicationDownloadError> {
@@ -648,24 +466,20 @@ impl Downloadable for GameDownloadAgent {
}
fn metadata(&self) -> DownloadableMetadata {
DownloadableMetadata {
id: self.id.clone(),
version: Some(self.version.clone()),
download_type: DownloadType::Game,
}
self.metadata.clone()
}
fn on_queued(&self, app_handle: &tauri::AppHandle) {
*self.status.lock().unwrap() = DownloadStatus::Queued;
let mut db_lock = borrow_db_mut_checked();
let status = ApplicationTransientStatus::Queued {
version_name: self.version.clone(),
version_id: self.metadata.version.clone(),
};
db_lock
.applications
.transient_statuses
.insert(self.metadata(), status.clone());
push_game_update(app_handle, &self.id, None, (None, Some(status)));
push_game_update(app_handle, &self.metadata.id, None, (None, Some(status)));
}
fn on_error(&self, app_handle: &tauri::AppHandle, error: &ApplicationDownloadError) {
@@ -682,18 +496,20 @@ impl Downloadable for GameDownloadAgent {
push_game_update(
app_handle,
&self.id,
&self.metadata.id,
None,
GameStatusManager::fetch_state(&self.id, &handle),
GameStatusManager::fetch_state(&self.metadata.id, &handle),
);
}
fn on_complete(&self, app_handle: &tauri::AppHandle) {
async fn on_complete(&self, app_handle: &tauri::AppHandle) {
match on_game_complete(
&self.metadata(),
self.dropdata.base_path.to_string_lossy().to_string(),
app_handle,
) {
)
.await
{
Ok(_) => {}
Err(e) => {
error!("could not mark game as complete: {e}");
@@ -706,7 +522,6 @@ impl Downloadable for GameDownloadAgent {
}
fn on_cancelled(&self, app_handle: &tauri::AppHandle) {
info!("cancelled {}", self.id);
self.cancel(app_handle);
}
@@ -1,175 +1,38 @@
use std::fs::{Permissions, set_permissions};
use std::io::Read;
use std::io::{Read, Seek as _, SeekFrom, Write as _};
#[cfg(unix)]
use std::os::unix::fs::PermissionsExt;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Instant;
use std::{
fs::{File, OpenOptions},
io::{self, BufWriter, Seek, SeekFrom, Write},
path::PathBuf,
};
use aes::cipher::{KeyIvInit, StreamCipher};
use download_manager::error::ApplicationDownloadError;
use download_manager::util::download_thread_control_flag::{
DownloadThreadControl, DownloadThreadControlFlag,
};
use download_manager::util::progress_object::ProgressHandle;
use log::{debug, info, warn};
use md5::{Context, Digest};
use droplet_rs::manifest::ChunkData;
use log::{debug, info};
use remote::auth::generate_authorization_header;
use remote::error::{DropServerError, RemoteAccessError};
use remote::requests::generate_url;
use remote::utils::DROP_CLIENT_SYNC;
use reqwest::blocking::Response;
use sha2::Digest;
use tauri::Url;
use crate::downloads::manifest::{ChunkBody, DownloadBucket, DownloadContext, DownloadDrop};
const READ_BUF_LEN: usize = 1024 * 1024;
static MAX_PACKET_LENGTH: usize = 4096 * 4;
static BUMP_SIZE: usize = 4096 * 16;
type Aes128Ctr64LE = ctr::Ctr64LE<aes::Aes128>;
pub struct DropWriter<W: Write> {
hasher: Context,
destination: BufWriter<W>,
progress: ProgressHandle,
}
impl DropWriter<File> {
fn new(path: PathBuf, progress: ProgressHandle) -> Result<Self, io::Error> {
let destination = OpenOptions::new()
.write(true)
.create(true)
.truncate(false)
.open(&path)
.inspect_err(|_v| warn!("failed to open {}", path.display()))?;
Ok(Self {
destination: BufWriter::with_capacity(1024 * 1024, destination),
hasher: Context::new(),
progress,
})
}
fn finish(mut self) -> io::Result<Digest> {
self.flush()?;
Ok(self.hasher.finalize())
}
}
// Write automatically pushes to file and hasher
impl Write for DropWriter<File> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.hasher
.write_all(buf)
.map_err(|e| io::Error::other(format!("Unable to write to hasher: {e}")))?;
let bytes_written = self.destination.write(buf)?;
self.progress.add(bytes_written);
Ok(bytes_written)
}
fn flush(&mut self) -> io::Result<()> {
self.hasher.flush()?;
self.destination.flush()
}
}
// Seek moves around destination output
impl Seek for DropWriter<File> {
fn seek(&mut self, pos: SeekFrom) -> io::Result<u64> {
self.destination.seek(pos)
}
}
pub struct DropDownloadPipeline<'a, R: Read, W: Write> {
pub source: R,
pub drops: Vec<DownloadDrop>,
pub destination: Vec<DropWriter<W>>,
pub control_flag: &'a DownloadThreadControl,
#[allow(dead_code)]
progress: ProgressHandle,
}
impl<'a> DropDownloadPipeline<'a, Response, File> {
fn new(
source: Response,
drops: Vec<DownloadDrop>,
control_flag: &'a DownloadThreadControl,
progress: ProgressHandle,
) -> Result<Self, io::Error> {
Ok(Self {
source,
destination: drops
.iter()
.map(|drop| DropWriter::new(drop.path.clone(), progress.clone()))
.try_collect()?,
drops,
control_flag,
progress,
})
}
fn copy(&mut self) -> Result<bool, io::Error> {
let mut copy_buffer = [0u8; MAX_PACKET_LENGTH];
for (index, drop) in self.drops.iter().enumerate() {
let destination = self
.destination
.get_mut(index)
.ok_or(io::Error::other("no destination"))?;
let mut remaining = drop.length;
if drop.start != 0 {
destination.seek(SeekFrom::Start(drop.start as u64))?;
}
let mut last_bump = 0;
loop {
let size = MAX_PACKET_LENGTH.min(remaining);
let size = self
.source
.read(&mut copy_buffer[0..size])
.inspect_err(|_| {
warn!("got error from {}", drop.filename);
})?;
remaining -= size;
last_bump += size;
destination.write_all(&copy_buffer[0..size])?;
if last_bump > BUMP_SIZE {
last_bump -= BUMP_SIZE;
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
return Ok(false);
}
}
if remaining == 0 {
break;
};
}
if self.control_flag.get() == DownloadThreadControlFlag::Stop {
return Ok(false);
}
}
Ok(true)
}
#[allow(dead_code)]
fn debug_skip_checksum(self) {
self.destination
.into_iter()
.for_each(|mut e| e.flush().unwrap());
}
fn finish(self) -> Result<Vec<Digest>, io::Error> {
let checksums = self
.destination
.into_iter()
.map(|e| e.finish())
.try_collect()?;
Ok(checksums)
}
}
pub fn download_game_bucket(
bucket: &DownloadBucket,
ctx: &DownloadContext,
#[allow(clippy::too_many_arguments)]
pub fn download_game_chunk(
game_id: &str,
version_id: &str,
chunk_id: &str,
depot: &str,
key: &[u8; 16],
chunk_data: &ChunkData,
base_path: PathBuf,
control_flag: &DownloadThreadControl,
progress: ProgressHandle,
) -> Result<bool, ApplicationDownloadError> {
@@ -183,14 +46,16 @@ pub fn download_game_bucket(
let header = generate_authorization_header();
let url = generate_url(&["/api/v2/client/chunk"], &[])
.map_err(ApplicationDownloadError::Communication)?;
let body = ChunkBody::create(ctx, &bucket.drops);
let url = Url::parse(depot)
.map_err(|v| ApplicationDownloadError::DownloadError(v.into()))?
.join(&format!(
"content/{}/{}/{}",
game_id, version_id, chunk_id
))
.map_err(|v| ApplicationDownloadError::DownloadError(v.into()))?;
let response = DROP_CLIENT_SYNC
.post(url)
.json(&body)
.get(url)
.header("Authorization", header)
.send()
.map_err(|e| ApplicationDownloadError::Communication(e.into()))?;
@@ -198,7 +63,7 @@ pub fn download_game_bucket(
if response.status() != 200 {
info!("chunk request got status code: {}", response.status());
let raw_res = response.text().map_err(|e| {
ApplicationDownloadError::Communication(RemoteAccessError::FetchError(e.into()))
ApplicationDownloadError::Communication(RemoteAccessError::FetchErrorLegacy(e.into()))
})?;
info!("{raw_res}");
if let Ok(err) = serde_json::from_str::<DropServerError>(&raw_res) {
@@ -211,92 +76,70 @@ pub fn download_game_bucket(
));
}
let lengths = response
.headers()
.get("Content-Lengths")
.ok_or(ApplicationDownloadError::Communication(
RemoteAccessError::UnparseableResponse("missing Content-Lengths header".to_owned()),
))?
.to_str()
.map_err(|e| {
ApplicationDownloadError::Communication(RemoteAccessError::UnparseableResponse(
e.to_string(),
))
})?;
for (i, raw_length) in lengths.split(",").enumerate() {
let length = raw_length.parse::<usize>().unwrap_or(0);
let Some(drop) = bucket.drops.get(i) else {
warn!("invalid number of Content-Lengths recieved: {i}, {lengths}");
return Err(ApplicationDownloadError::DownloadError(
RemoteAccessError::InvalidResponse(DropServerError {
status_code: 400,
status_message: "Server Error".to_owned(),
message: format!(
"invalid number of Content-Lengths recieved: {i}, {lengths}"
),
}),
));
};
if drop.length != length {
warn!(
"for {}, expected {}, got {} ({})",
drop.filename, drop.length, raw_length, length
);
return Err(ApplicationDownloadError::DownloadError(
RemoteAccessError::InvalidResponse(DropServerError {
status_code: 400,
status_message: "Server Error".to_owned(),
message: format!(
"for {}, expected {}, got {} ({})",
drop.filename, drop.length, raw_length, length
),
}),
));
}
if control_flag.get() == DownloadThreadControlFlag::Stop {
progress.set(0);
return Ok(false);
}
let timestep = start.elapsed().as_millis();
debug!("took {}ms to start downloading", timestep);
let mut pipeline =
DropDownloadPipeline::new(response, bucket.drops.clone(), control_flag, progress)
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
/*let stream = response
.bytes_stream()
.map(|v| v.map_err(|err| std::io::Error::other(err)));
let mut stream_reader = StreamReader::new(stream);*/
let mut stream_reader = response;
let completed = pipeline
.copy()
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
if !completed {
return Ok(false);
}
let mut hasher = sha2::Sha256::new();
let mut cipher = Aes128Ctr64LE::new(key.into(), &chunk_data.iv.into());
let mut read_buf = vec![0u8; READ_BUF_LEN];
for file in &chunk_data.files {
let path = base_path.join(file.filename.clone());
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file_handle = std::fs::OpenOptions::new()
.truncate(false)
.write(true)
.append(false)
.create(true)
.open(&path)?;
file_handle.seek(SeekFrom::Start(file.start.try_into().unwrap()))?;
// If we complete the file, set the permissions (if on Linux)
#[cfg(unix)]
{
for drop in bucket.drops.iter() {
let permission = if drop.permissions == 0 {
let mut remaining = file.length;
while remaining > 0 {
let amount = stream_reader.read(&mut read_buf[0..remaining.min(READ_BUF_LEN)])?;
progress.add(amount);
remaining -= amount;
cipher.apply_keystream(&mut read_buf[0..amount]);
hasher.update(&read_buf[0..amount]);
file_handle.write_all(&read_buf[0..amount])?;
}
#[cfg(unix)]
{
drop(file_handle);
let permissions = if file.permissions == 0 {
0o744
} else {
drop.permissions
file.permissions
};
let permissions = Permissions::from_mode(permission);
set_permissions(drop.path.clone(), permissions)
let permissions = Permissions::from_mode(permissions);
set_permissions(path, permissions)
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
}
if control_flag.get() == DownloadThreadControlFlag::Stop {
progress.set(0);
return Ok(false);
}
}
let checksums = pipeline
.finish()
.map_err(|e| ApplicationDownloadError::IoError(Arc::new(e)))?;
for (index, drop) in bucket.drops.iter().enumerate() {
let res = hex::encode(**checksums.get(index).unwrap());
if res != drop.checksum {
warn!("context didn't match... doing nothing because we will validate later.");
// return Ok(false);
// return Err(ApplicationDownloadError::Checksum);
}
let digest = hex::encode(hasher.finalize());
if digest != chunk_data.checksum {
return Err(ApplicationDownloadError::Checksum);
}
Ok(true)
@@ -5,17 +5,19 @@ use std::{
path::{Path, PathBuf},
};
use database::platform::Platform;
use log::error;
use native_model::{Decode, Encode};
use utils::lock;
pub type DropData = v1::DropData;
pub static DROP_DATA_PATH: &str = ".dropdata";
pub static DROPDATA_PATH: &str = ".dropdata";
pub mod v1 {
use std::{collections::HashMap, path::PathBuf, sync::Mutex};
use database::platform::Platform;
use native_model::native_model;
use serde::{Deserialize, Serialize};
@@ -24,16 +26,18 @@ pub mod v1 {
pub struct DropData {
pub game_id: String,
pub game_version: String,
pub target_platform: Platform,
pub contexts: Mutex<HashMap<String, bool>>,
pub base_path: PathBuf,
}
impl DropData {
pub fn new(game_id: String, game_version: String, base_path: PathBuf) -> Self {
pub fn new(game_id: String, game_version: String, target_platform: Platform, base_path: PathBuf) -> Self {
Self {
base_path,
game_id,
game_version,
target_platform,
contexts: Mutex::new(HashMap::new()),
}
}
@@ -41,14 +45,14 @@ pub mod v1 {
}
impl DropData {
pub fn generate(game_id: String, game_version: String, base_path: PathBuf) -> Self {
pub fn generate(game_id: String, game_version: String, target_platform: Platform, base_path: PathBuf) -> Self {
match DropData::read(&base_path) {
Ok(v) => v,
Err(_) => DropData::new(game_id, game_version, base_path),
Err(_) => DropData::new(game_id, game_version, target_platform, base_path),
}
}
pub fn read(base_path: &Path) -> Result<Self, io::Error> {
let mut file = File::open(base_path.join(DROP_DATA_PATH))?;
let mut file = File::open(base_path.join(DROPDATA_PATH))?;
let mut s = Vec::new();
file.read_to_end(&mut s)?;
@@ -66,7 +70,7 @@ impl DropData {
Err(_) => return,
};
let mut file = match File::create(self.base_path.join(DROP_DATA_PATH)) {
let mut file = match File::create(self.base_path.join(DROPDATA_PATH)) {
Ok(file) => file,
Err(e) => {
error!("{e}");
@@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
#[derive(Debug, Clone, Serialize)]
@@ -21,57 +20,6 @@ pub struct DownloadBucket {
pub drops: Vec<DownloadDrop>,
}
#[derive(Deserialize)]
pub struct DownloadContext {
pub context: String,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChunkBodyFile {
filename: String,
chunk_index: usize,
}
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ChunkBody {
pub context: String,
pub files: Vec<ChunkBodyFile>,
}
#[derive(Serialize)]
pub struct ManifestBody {
pub game: String,
pub version: String,
}
impl ChunkBody {
pub fn create(context: &DownloadContext, drops: &[DownloadDrop]) -> ChunkBody {
Self {
context: context.context.clone(),
files: drops
.iter()
.map(|e| ChunkBodyFile {
filename: e.filename.clone(),
chunk_index: e.index,
})
.collect(),
}
}
}
pub type DropManifest = HashMap<String, DropChunk>;
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct DropChunk {
pub permissions: u32,
pub ids: Vec<String>,
pub checksums: Vec<String>,
pub lengths: Vec<usize>,
pub version_name: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DropValidateContext {
pub index: usize,
+1 -2
View File
@@ -3,5 +3,4 @@ mod download_logic;
pub mod drop_data;
pub mod error;
mod manifest;
pub mod utils;
pub mod validate;
pub mod utils;
@@ -1,104 +0,0 @@
use std::{
fs::File,
io::{self, BufWriter, Read, Seek, SeekFrom, Write},
};
use download_manager::{
error::ApplicationDownloadError,
util::{
download_thread_control_flag::{DownloadThreadControl, DownloadThreadControlFlag},
progress_object::ProgressHandle,
},
};
use log::debug;
use md5::Context;
use crate::downloads::manifest::DropValidateContext;
pub fn validate_game_chunk(
ctx: &DropValidateContext,
control_flag: &DownloadThreadControl,
progress: ProgressHandle,
) -> Result<bool, ApplicationDownloadError> {
debug!(
"Starting chunk validation {}, {}, {} #{}",
ctx.path.display(),
ctx.index,
ctx.offset,
ctx.checksum
);
// If we're paused
if control_flag.get() == DownloadThreadControlFlag::Stop {
progress.set(0);
return Ok(false);
}
let Ok(mut source) = File::open(&ctx.path) else {
return Ok(false);
};
if ctx.offset != 0 {
source
.seek(SeekFrom::Start(ctx.offset as u64))
.expect("Failed to seek to file offset");
}
let mut hasher = md5::Context::new();
let completed = validate_copy(&mut source, &mut hasher, ctx.length, control_flag, progress)?;
if !completed {
return Ok(false);
}
let res = hex::encode(hasher.finalize().0);
if res != ctx.checksum {
return Ok(false);
}
debug!(
"Successfully finished verification #{}, copied {} bytes",
ctx.checksum, ctx.length
);
Ok(true)
}
fn validate_copy(
source: &mut File,
dest: &mut Context,
size: usize,
control_flag: &DownloadThreadControl,
progress: ProgressHandle,
) -> Result<bool, io::Error> {
let copy_buf_size = 512;
let mut copy_buf = vec![0; copy_buf_size];
let mut buf_writer = BufWriter::with_capacity(1024 * 1024, dest);
let mut total_bytes = 0;
loop {
if control_flag.get() == DownloadThreadControlFlag::Stop {
buf_writer.flush()?;
return Ok(false);
}
let mut bytes_read = source.read(&mut copy_buf)?;
total_bytes += bytes_read;
// If we read over (likely), truncate our read to
// the right size
if total_bytes > size {
let over = total_bytes - size;
bytes_read -= over;
total_bytes = size;
}
buf_writer.write_all(&copy_buf[0..bytes_read])?;
progress.add(bytes_read);
if total_bytes >= size {
break;
}
}
buf_writer.flush()?;
Ok(true)
}
+1
View File
@@ -1,4 +1,5 @@
#![feature(iterator_try_collect)]
#![feature(lock_value_accessors)]
pub mod collections;
pub mod downloads;
+37 -35
View File
@@ -5,8 +5,10 @@ use database::{
};
use log::{debug, error, warn};
use remote::{
auth::generate_authorization_header, error::RemoteAccessError, requests::generate_url,
utils::DROP_CLIENT_SYNC,
auth::generate_authorization_header,
error::RemoteAccessError,
requests::generate_url,
utils::DROP_CLIENT_ASYNC
};
use serde::{Deserialize, Serialize};
use std::fs::remove_dir_all;
@@ -18,9 +20,9 @@ use crate::state::{GameStatusManager, GameStatusWithTransient};
#[derive(Serialize, Deserialize, Debug)]
pub struct FetchGameStruct {
game: Game,
status: GameStatusWithTransient,
version: Option<GameVersion>,
pub game: Game,
pub status: GameStatusWithTransient,
pub version: Option<GameVersion>,
}
impl FetchGameStruct {
@@ -36,17 +38,19 @@ impl FetchGameStruct {
#[derive(Serialize, Deserialize, Clone, Debug, Default, Encode, Decode)]
#[serde(rename_all = "camelCase")]
pub struct Game {
id: String,
m_name: String,
m_short_description: String,
m_description: String,
pub id: String,
#[serde(rename = "type")]
pub game_type: String,
pub m_name: String,
pub m_short_description: String,
pub m_description: String,
// mDevelopers
// mPublishers
m_icon_object_id: String,
m_banner_object_id: String,
m_cover_object_id: String,
m_image_library_object_ids: Vec<String>,
m_image_carousel_object_ids: Vec<String>,
pub m_icon_object_id: String,
pub m_banner_object_id: String,
pub m_cover_object_id: String,
pub m_image_library_object_ids: Vec<String>,
pub m_image_carousel_object_ids: Vec<String>,
}
impl Game {
pub fn id(&self) -> &String {
@@ -87,7 +91,7 @@ pub fn set_partially_installed_db(
db_lock.applications.game_statuses.insert(
meta.id.clone(),
GameDownloadStatus::PartiallyInstalled {
version_name: meta.version.as_ref().unwrap().clone(),
version_name: meta.version.clone(),
install_dir,
},
);
@@ -193,38 +197,29 @@ pub fn get_current_meta(game_id: &String) -> Option<DownloadableMetadata> {
.cloned()
}
pub fn on_game_complete(
pub async fn on_game_complete(
meta: &DownloadableMetadata,
install_dir: String,
app_handle: &AppHandle,
) -> Result<(), RemoteAccessError> {
// Fetch game version information from remote
if meta.version.is_none() {
return Err(RemoteAccessError::GameNotFound(meta.id.clone()));
}
let client = DROP_CLIENT_SYNC.clone();
let response = generate_url(
&["/api/v1/client/game/version"],
&[
("id", &meta.id),
("version", meta.version.as_ref().unwrap()),
],
&["/api/v1/client/game", &meta.id, "version", &meta.version],
&[],
)?;
let response = client
let response = DROP_CLIENT_ASYNC
.get(response)
.header("Authorization", generate_authorization_header())
.send()?;
.send()
.await?;
let game_version: GameVersion = response.json()?;
let game_version: GameVersion = response.json().await?;
let mut handle = borrow_db_mut_checked();
handle
.applications
.game_versions
.entry(meta.id.clone())
.or_default()
.insert(meta.version.clone().unwrap(), game_version.clone());
.insert(meta.version.clone(), game_version.clone());
handle
.applications
.installed_game_version
@@ -232,14 +227,19 @@ pub fn on_game_complete(
drop(handle);
let status = if game_version.setup_command.is_empty() {
let setup_configuration = game_version
.setups
.iter()
.find(|v| v.platform == meta.target_platform);
let status = if setup_configuration.is_none() {
GameDownloadStatus::Installed {
version_name: meta.version.clone().unwrap(),
version_name: meta.version.clone(),
install_dir,
}
} else {
GameDownloadStatus::SetupRequired {
version_name: meta.version.clone().unwrap(),
version_name: meta.version.clone(),
install_dir,
}
};
@@ -260,6 +260,8 @@ pub fn on_game_complete(
}
);
app_emit!(app_handle, "update_library", ());
Ok(())
}
+4 -3
View File
@@ -4,7 +4,7 @@ use database::{DownloadType, DownloadableMetadata, borrow_db_mut_checked};
use log::warn;
use crate::{
downloads::drop_data::{DROP_DATA_PATH, DropData},
downloads::drop_data::{DROPDATA_PATH, DropData},
library::set_partially_installed_db,
};
@@ -15,7 +15,7 @@ pub fn scan_install_dirs() {
continue;
};
for game in files.into_iter().flatten() {
let drop_data_file = game.path().join(DROP_DATA_PATH);
let drop_data_file = game.path().join(DROPDATA_PATH);
if !drop_data_file.exists() {
continue;
}
@@ -33,7 +33,8 @@ pub fn scan_install_dirs() {
let metadata = DownloadableMetadata::new(
drop_data.game_id,
Some(drop_data.game_version),
drop_data.game_version,
drop_data.target_platform,
DownloadType::Game,
);
set_partially_installed_db(
+5 -7
View File
@@ -1,5 +1,5 @@
use database::models::data::{
ApplicationTransientStatus, Database, DownloadType, DownloadableMetadata, GameDownloadStatus,
ApplicationTransientStatus, Database, DownloadType, GameDownloadStatus,
};
pub type GameStatusWithTransient = (
@@ -13,12 +13,10 @@ impl GameStatusManager {
let online_state = database
.applications
.transient_statuses
.get(&DownloadableMetadata {
id: game_id.to_string(),
download_type: DownloadType::Game,
version: None,
})
.cloned();
.iter()
.find(|v| v.0.id == *game_id && v.0.download_type == DownloadType::Game)
.map(|v| v.1.clone())
.clone();
let offline_state = database.applications.game_statuses.get(game_id).cloned();