Merge branch 'downloads'
Adds the backend logic for downloading games, as well as debug buttons in the store page
This commit is contained in:
+1
-1
@@ -44,7 +44,7 @@ listen("auth/failed", () => {
|
||||
});
|
||||
|
||||
listen("auth/finished", () => {
|
||||
router.push("/");
|
||||
router.push("/store");
|
||||
});
|
||||
|
||||
useHead({
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
<template>
|
||||
<div
|
||||
class="h-16 cursor-pointer bg-zinc-950 flex flex-row justify-between"
|
||||
class="h-16 bg-zinc-950 flex flex-row justify-between"
|
||||
>
|
||||
<div @mousedown="() => window.startDragging()"
|
||||
class="flex flex-row grow items-center justify-between pl-5 pr-2 py-3">
|
||||
<div class="flex flex-row grow items-center pl-5 pr-2 py-3">
|
||||
<div class="inline-flex items-center gap-x-10">
|
||||
<NuxtLink to="/">
|
||||
<NuxtLink to="/store">
|
||||
<Wordmark class="h-8 mb-0.5"/>
|
||||
</NuxtLink>
|
||||
<nav class="inline-flex items-center mt-0.5">
|
||||
@@ -25,6 +24,7 @@
|
||||
</ol>
|
||||
</nav>
|
||||
</div>
|
||||
<div @mousedown="() => window.startDragging()" class="flex cursor-pointer grow h-full" />
|
||||
<div class="inline-flex items-center">
|
||||
<ol class="inline-flex gap-3">
|
||||
<li v-for="(item, itemIdx) in quickActions">
|
||||
|
||||
@@ -1,3 +1,69 @@
|
||||
<template>
|
||||
|
||||
</template>
|
||||
<button
|
||||
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
||||
@click="queueGameWrapper"
|
||||
>
|
||||
Queue Game Download
|
||||
</button>
|
||||
<input placeholder="GAME ID" v-model="gameId" />
|
||||
<input placeholder="VERSION NAME" v-model="versionName" />
|
||||
<button
|
||||
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
||||
@click="startGameDownloadsWrapper"
|
||||
>
|
||||
Start Game Downloads
|
||||
</button>
|
||||
<button
|
||||
class="w-full rounded-md p-4 bg-blue-600 text-white"
|
||||
@click="cancelGameDownloadWrapper"
|
||||
>
|
||||
Cancel game download
|
||||
</button>
|
||||
</template>
|
||||
<script setup lang="ts">
|
||||
import { invoke } from "@tauri-apps/api/core";
|
||||
|
||||
const gameId = ref("");
|
||||
const versionName = ref("");
|
||||
|
||||
async function queueGame() {
|
||||
await invoke("queue_game_download", {
|
||||
gameId: gameId.value,
|
||||
gameVersion: versionName.value,
|
||||
maxThreads: 12,
|
||||
});
|
||||
console.log("Requested game from FE");
|
||||
}
|
||||
function queueGameWrapper() {
|
||||
console.log("Wrapper started");
|
||||
queueGame()
|
||||
.then(() => {})
|
||||
.catch((e) => {
|
||||
console.log(e);
|
||||
});
|
||||
}
|
||||
async function startGameDownloads() {
|
||||
console.log("Downloading Games");
|
||||
await invoke("start_game_downloads", { maxThreads: 4 })
|
||||
console.log("Finished downloading games");
|
||||
}
|
||||
function startGameDownloadsWrapper() {
|
||||
startGameDownloads()
|
||||
.then(() => {})
|
||||
.catch((e) => {
|
||||
console.log(e)
|
||||
})
|
||||
}
|
||||
async function cancelGameDownload() {
|
||||
console.log("Cancelling game download");
|
||||
await invoke("stop_specific_game_download", { gameId: gameId.value })
|
||||
}
|
||||
function cancelGameDownloadWrapper() {
|
||||
console.log("Triggered game cancel wrapper");
|
||||
cancelGameDownload()
|
||||
.then(() => {})
|
||||
.catch((e) => {
|
||||
console.log(e)
|
||||
})
|
||||
}
|
||||
</script>
|
||||
|
||||
Generated
+75
@@ -1021,13 +1021,16 @@ dependencies = [
|
||||
"ciborium",
|
||||
"directories",
|
||||
"env_logger",
|
||||
"gxhash",
|
||||
"hex",
|
||||
"http",
|
||||
"log",
|
||||
"md5",
|
||||
"openssl",
|
||||
"rayon",
|
||||
"reqwest",
|
||||
"rustbreak",
|
||||
"rustix",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"structured-logger",
|
||||
@@ -1037,8 +1040,11 @@ dependencies = [
|
||||
"tauri-plugin-dialog",
|
||||
"tauri-plugin-shell",
|
||||
"tauri-plugin-single-instance",
|
||||
"tokio",
|
||||
"url",
|
||||
"urlencoding",
|
||||
"uuid",
|
||||
"versions",
|
||||
"webbrowser",
|
||||
]
|
||||
|
||||
@@ -1692,6 +1698,15 @@ dependencies = [
|
||||
"syn 2.0.79",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "gxhash"
|
||||
version = "2.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "09f0c897148ec6ff3ca864b7c886df75e6ba09972d206bd9a89af0c18c992253"
|
||||
dependencies = [
|
||||
"rand 0.8.5",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "h2"
|
||||
version = "0.4.6"
|
||||
@@ -2030,6 +2045,15 @@ version = "1.70.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.13.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "0.4.8"
|
||||
@@ -2260,6 +2284,12 @@ version = "0.1.10"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "2532096657941c2fea9c289d370a250971c689d4f143798ff67113ec042024a5"
|
||||
|
||||
[[package]]
|
||||
name = "md5"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771"
|
||||
|
||||
[[package]]
|
||||
name = "memchr"
|
||||
version = "2.7.4"
|
||||
@@ -2281,6 +2311,12 @@ version = "0.3.17"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
|
||||
|
||||
[[package]]
|
||||
name = "minimal-lexical"
|
||||
version = "0.2.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||
|
||||
[[package]]
|
||||
name = "miniz_oxide"
|
||||
version = "0.8.0"
|
||||
@@ -2394,6 +2430,16 @@ version = "0.1.14"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
|
||||
|
||||
[[package]]
|
||||
name = "nom"
|
||||
version = "7.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
"minimal-lexical",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "num-conv"
|
||||
version = "0.1.0"
|
||||
@@ -4500,10 +4546,22 @@ dependencies = [
|
||||
"pin-project-lite",
|
||||
"signal-hook-registry",
|
||||
"socket2",
|
||||
"tokio-macros",
|
||||
"tracing",
|
||||
"windows-sys 0.52.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-macros"
|
||||
version = "2.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752"
|
||||
dependencies = [
|
||||
"proc-macro2",
|
||||
"quote",
|
||||
"syn 2.0.79",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-native-tls"
|
||||
version = "0.3.1"
|
||||
@@ -4776,6 +4834,12 @@ dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "urlencoding"
|
||||
version = "2.1.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da"
|
||||
|
||||
[[package]]
|
||||
name = "urlpattern"
|
||||
version = "0.3.0"
|
||||
@@ -4877,6 +4941,17 @@ version = "0.9.5"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
|
||||
|
||||
[[package]]
|
||||
name = "versions"
|
||||
version = "6.3.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "f25d498b63d1fdb376b4250f39ab3a5ee8d103957346abacd911e2d8b612c139"
|
||||
dependencies = [
|
||||
"itertools",
|
||||
"nom",
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "vswhom"
|
||||
version = "0.1.0"
|
||||
|
||||
@@ -17,13 +17,17 @@ tauri-plugin-single-instance = { version = "2.0.0", features = ["deep-link"] }
|
||||
name = "drop_app_lib"
|
||||
crate-type = ["staticlib", "cdylib", "rlib"]
|
||||
|
||||
[build]
|
||||
rustflags = ["-C", "target-feature=+aes,+sse2"]
|
||||
|
||||
|
||||
[build-dependencies]
|
||||
tauri-build = { version = "2.0.0", features = [] }
|
||||
|
||||
[dependencies]
|
||||
tauri = { version = "2.0.0", features = [] }
|
||||
tauri-plugin-shell = "2.0.0"
|
||||
serde = { version = "1", features = ["derive"] }
|
||||
serde = { version = "1", features = ["derive", "rc"] }
|
||||
serde_json = "1"
|
||||
ciborium = "0.2.2"
|
||||
rayon = "1.10.0"
|
||||
@@ -37,6 +41,12 @@ hex = "0.4.3"
|
||||
tauri-plugin-dialog = "2"
|
||||
env_logger = "0.11.5"
|
||||
http = "1.1.0"
|
||||
tokio = { version = "1.40.0", features = ["rt", "tokio-macros"] }
|
||||
versions = { version = "6.3.2", features = ["serde"] }
|
||||
urlencoding = "2.1.3"
|
||||
rustix = "0.38.37"
|
||||
gxhash = "2.3.0"
|
||||
md5 = "0.7.0"
|
||||
|
||||
[dependencies.uuid]
|
||||
version = "1.10.0"
|
||||
|
||||
@@ -24,6 +24,7 @@ pub enum DatabaseGameStatus {
|
||||
Downloading,
|
||||
Installed,
|
||||
Updating,
|
||||
|
||||
Uninstalling,
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,186 @@
|
||||
use crate::auth::generate_authorization_header;
|
||||
use crate::db::{DatabaseImpls, DATA_ROOT_DIR};
|
||||
use crate::downloads::download_logic;
|
||||
use crate::downloads::manifest::{DropDownloadContext, DropManifest};
|
||||
use crate::downloads::progress::ProgressChecker;
|
||||
use crate::{AppState, DB};
|
||||
use log::info;
|
||||
use rustix::fs::{fallocate, FallocateFlags};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use urlencoding::encode;
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::path::Path;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub struct GameDownloadAgent {
|
||||
pub id: String,
|
||||
pub version: String,
|
||||
state: Mutex<GameDownloadState>,
|
||||
contexts: Mutex<Vec<DropDownloadContext>>,
|
||||
progress: ProgressChecker<DropDownloadContext>,
|
||||
pub manifest: Mutex<Option<DropManifest>>,
|
||||
pub callback: Arc<AtomicBool>
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq)]
|
||||
pub enum GameDownloadState {
|
||||
Uninitialised,
|
||||
Queued,
|
||||
Manifest,
|
||||
Downloading,
|
||||
Finished,
|
||||
Stalled,
|
||||
Failed,
|
||||
Cancelled,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Eq, PartialEq)]
|
||||
pub enum GameDownloadError {
|
||||
ManifestDownload,
|
||||
FailedContextGeneration,
|
||||
Status(u16),
|
||||
System(SystemError),
|
||||
}
|
||||
#[derive(Serialize, Deserialize, Clone, Eq, PartialEq, Debug)]
|
||||
pub enum SystemError {
|
||||
MutexLockFailed,
|
||||
}
|
||||
|
||||
impl GameDownloadAgent {
|
||||
pub fn new(id: String, version: String) -> Self {
|
||||
let callback = Arc::new(AtomicBool::new(false));
|
||||
Self {
|
||||
id,
|
||||
version,
|
||||
state: Mutex::from(GameDownloadState::Uninitialised),
|
||||
manifest: Mutex::new(None),
|
||||
callback: callback.clone(),
|
||||
progress: ProgressChecker::new(
|
||||
Box::new(download_logic::download_game_chunk),
|
||||
Arc::new(AtomicUsize::new(0)),
|
||||
callback
|
||||
),
|
||||
contexts: Mutex::new(Vec::new()),
|
||||
}
|
||||
}
|
||||
pub async fn queue(&self) -> Result<(), GameDownloadError> {
|
||||
self.change_state(GameDownloadState::Queued);
|
||||
if self.manifest.lock().unwrap().is_none() {
|
||||
return Ok(());
|
||||
}
|
||||
self.ensure_manifest_exists()
|
||||
}
|
||||
|
||||
pub fn begin_download(&self, max_threads: usize) -> Result<(), GameDownloadError> {
|
||||
self.change_state(GameDownloadState::Downloading);
|
||||
// TODO we're coping the whole context thing
|
||||
// It's not necessary, I just can't figure out to make the borrow checker happy
|
||||
{
|
||||
let lock = self.contexts.lock().unwrap().to_vec();
|
||||
self.progress
|
||||
.run_context_parallel(lock, max_threads);
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn ensure_manifest_exists(&self) -> Result<(), GameDownloadError> {
|
||||
if self.manifest.lock().unwrap().is_some() {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
self.download_manifest()
|
||||
}
|
||||
|
||||
fn download_manifest(&self) -> Result<(), GameDownloadError> {
|
||||
let base_url = DB.fetch_base_url();
|
||||
let manifest_url = base_url
|
||||
.join(
|
||||
format!(
|
||||
"/api/v1/client/metadata/manifest?id={}&version={}",
|
||||
self.id, encode(&self.version)
|
||||
)
|
||||
.as_str(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
let header = generate_authorization_header();
|
||||
|
||||
info!("Generating & sending client");
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let response = client
|
||||
.get(manifest_url.to_string())
|
||||
.header("Authorization", header)
|
||||
.send()
|
||||
.unwrap();
|
||||
|
||||
if response.status() != 200 {
|
||||
info!("Error status: {}", response.status());
|
||||
return Err(GameDownloadError::Status(response.status().as_u16()));
|
||||
}
|
||||
|
||||
let manifest_download = response.json::<DropManifest>().unwrap();
|
||||
if let Ok(mut manifest) = self.manifest.lock() {
|
||||
*manifest = Some(manifest_download)
|
||||
} else {
|
||||
return Err(GameDownloadError::System(SystemError::MutexLockFailed));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn change_state(&self, state: GameDownloadState) {
|
||||
let mut lock = self.state.lock().unwrap();
|
||||
*lock = state;
|
||||
}
|
||||
pub fn get_state(&self) -> GameDownloadState {
|
||||
let lock = self.state.lock().unwrap();
|
||||
lock.clone()
|
||||
}
|
||||
|
||||
pub fn generate_job_contexts(
|
||||
&self,
|
||||
manifest: &DropManifest,
|
||||
version: String,
|
||||
game_id: String,
|
||||
) -> Result<(), GameDownloadError> {
|
||||
let mut contexts = Vec::new();
|
||||
let base_path = DATA_ROOT_DIR.join("games").join(game_id.clone()).clone();
|
||||
create_dir_all(base_path.clone()).unwrap();
|
||||
info!("Generating contexts");
|
||||
for (raw_path, chunk) in manifest {
|
||||
let path = base_path.join(Path::new(raw_path));
|
||||
|
||||
let container = path.parent().unwrap();
|
||||
create_dir_all(container).unwrap();
|
||||
|
||||
let file = File::create(path.clone()).unwrap();
|
||||
let mut running_offset = 0;
|
||||
|
||||
for (i, length) in chunk.lengths.iter().enumerate() {
|
||||
contexts.push(DropDownloadContext {
|
||||
file_name: raw_path.to_string(),
|
||||
version: version.to_string(),
|
||||
offset: running_offset,
|
||||
index: i,
|
||||
game_id: game_id.to_string(),
|
||||
path: path.clone(),
|
||||
checksum: chunk.checksums[i].clone()
|
||||
});
|
||||
running_offset += *length as u64;
|
||||
}
|
||||
|
||||
if running_offset > 0 {
|
||||
fallocate(file, FallocateFlags::empty(), 0, running_offset).unwrap();
|
||||
}
|
||||
}
|
||||
info!("Finished generating");
|
||||
if let Ok(mut context_lock) = self.contexts.lock() {
|
||||
*context_lock = contexts;
|
||||
} else {
|
||||
return Err(GameDownloadError::FailedContextGeneration);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,96 @@
|
||||
use std::{sync::{atomic::Ordering, Arc, Mutex}, thread};
|
||||
|
||||
use log::info;
|
||||
|
||||
use crate::{downloads::download_agent::GameDownloadAgent, AppState};
|
||||
|
||||
use super::download_agent::{GameDownloadError, GameDownloadState};
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn queue_game_download(
|
||||
game_id: String,
|
||||
game_version: String,
|
||||
state: tauri::State<'_, Mutex<AppState>>,
|
||||
) -> Result<(), GameDownloadError> {
|
||||
info!("Queuing Game Download");
|
||||
let download_agent = Arc::new(GameDownloadAgent::new(game_id.clone(), game_version.clone()));
|
||||
download_agent.queue().await?;
|
||||
|
||||
let mut queue = state.lock().unwrap();
|
||||
queue.game_downloads.insert(game_id, download_agent);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn start_game_downloads(
|
||||
max_threads: usize,
|
||||
state: tauri::State<'_, Mutex<AppState>>,
|
||||
) -> Result<(), GameDownloadError> {
|
||||
info!("Downloading Games");
|
||||
let lock = state.lock().unwrap();
|
||||
let mut game_downloads = lock.game_downloads.clone();
|
||||
drop(lock);
|
||||
thread::spawn(move || {
|
||||
loop {
|
||||
let mut current_id = String::new();
|
||||
let mut download_agent = None;
|
||||
{
|
||||
for (id, agent) in &game_downloads {
|
||||
if agent.get_state() == GameDownloadState::Queued {
|
||||
download_agent = Some(agent.clone());
|
||||
current_id = id.clone();
|
||||
info!("Got queued game to download");
|
||||
break;
|
||||
}
|
||||
}
|
||||
if download_agent.is_none() {
|
||||
info!("No more games left to download");
|
||||
return;
|
||||
}
|
||||
};
|
||||
info!("Downloading game");
|
||||
{
|
||||
start_game_download(max_threads, download_agent.unwrap()).unwrap();
|
||||
game_downloads.remove_entry(¤t_id);
|
||||
}
|
||||
}
|
||||
});
|
||||
info!("Spawned download");
|
||||
return Ok(())
|
||||
}
|
||||
|
||||
pub fn start_game_download(
|
||||
max_threads: usize,
|
||||
download_agent: Arc<GameDownloadAgent>
|
||||
) -> Result<(), GameDownloadError> {
|
||||
info!("Triggered Game Download");
|
||||
|
||||
|
||||
download_agent.ensure_manifest_exists()?;
|
||||
|
||||
let local_manifest = {
|
||||
let manifest = download_agent.manifest.lock().unwrap();
|
||||
(*manifest).clone().unwrap()
|
||||
};
|
||||
|
||||
download_agent.generate_job_contexts(&local_manifest, download_agent.version.clone(), download_agent.id.clone()).unwrap();
|
||||
|
||||
download_agent.begin_download(max_threads).unwrap();
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
pub async fn stop_specific_game_download(state: tauri::State<'_, Mutex<AppState>>, game_id: String) -> Result<(), String> {
|
||||
info!("called stop_specific_game_download");
|
||||
let lock = state.lock().unwrap();
|
||||
let download_agent = lock.game_downloads.get(&game_id).unwrap();
|
||||
|
||||
let callback = download_agent.callback.clone();
|
||||
drop(lock);
|
||||
|
||||
info!("Stopping callback");
|
||||
callback.store(true, Ordering::Release);
|
||||
|
||||
return Ok(())
|
||||
}
|
||||
@@ -0,0 +1,124 @@
|
||||
use crate::auth::generate_authorization_header;
|
||||
use crate::db::DatabaseImpls;
|
||||
use crate::downloads::manifest::DropDownloadContext;
|
||||
use crate::DB;
|
||||
use gxhash::{gxhash128, GxHasher};
|
||||
use log::info;
|
||||
use md5::{Context, Digest};
|
||||
use reqwest::blocking::Response;
|
||||
use std::{fs::{File, OpenOptions}, hash::Hasher, io::{self, BufReader, BufWriter, Error, ErrorKind, Read, Seek, SeekFrom, Write}, path::PathBuf, sync::{atomic::{AtomicBool, Ordering}, Arc}};
|
||||
use urlencoding::encode;
|
||||
|
||||
pub struct DropFileWriter {
|
||||
file: File,
|
||||
hasher: Context,
|
||||
callback: Arc<AtomicBool>
|
||||
}
|
||||
impl DropFileWriter {
|
||||
fn new(path: PathBuf, callback: Arc<AtomicBool>) -> Self {
|
||||
Self {
|
||||
file: OpenOptions::new().write(true).open(path).unwrap(),
|
||||
hasher: Context::new(),
|
||||
callback
|
||||
}
|
||||
}
|
||||
fn finish(mut self) -> io::Result<Digest> {
|
||||
self.flush().unwrap();
|
||||
Ok(self.hasher.compute())
|
||||
}
|
||||
}
|
||||
// TODO: Implement error handling
|
||||
impl Write for DropFileWriter {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
if self.callback.load(Ordering::Acquire) {
|
||||
return Err(Error::new(ErrorKind::ConnectionAborted, "Interrupt command recieved"));
|
||||
}
|
||||
|
||||
//info!("Writing data to writer");
|
||||
self.hasher.write_all(buf).unwrap();
|
||||
self.file.write(buf)
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.hasher.flush()?;
|
||||
self.file.flush()
|
||||
}
|
||||
}
|
||||
impl Seek for DropFileWriter {
|
||||
fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
|
||||
self.file.seek(pos)
|
||||
}
|
||||
}
|
||||
pub fn download_game_chunk(ctx: DropDownloadContext, callback: Arc<AtomicBool>) {
|
||||
if callback.load(Ordering::Acquire) {
|
||||
info!("Callback stopped download at start");
|
||||
return;
|
||||
}
|
||||
let base_url = DB.fetch_base_url();
|
||||
|
||||
let client = reqwest::blocking::Client::new();
|
||||
let chunk_url = base_url
|
||||
.join(&format!(
|
||||
"/api/v1/client/chunk?id={}&version={}&name={}&chunk={}",
|
||||
// Encode the parts we don't trust
|
||||
ctx.game_id,
|
||||
encode(&ctx.version),
|
||||
encode(&ctx.file_name),
|
||||
ctx.index
|
||||
))
|
||||
.unwrap();
|
||||
|
||||
let header = generate_authorization_header();
|
||||
|
||||
let mut response = client
|
||||
.get(chunk_url)
|
||||
.header("Authorization", header)
|
||||
.send()
|
||||
.unwrap();
|
||||
|
||||
let mut file: DropFileWriter = DropFileWriter::new(ctx.path, callback);
|
||||
|
||||
if ctx.offset != 0 {
|
||||
file
|
||||
.seek(SeekFrom::Start(ctx.offset))
|
||||
.expect("Failed to seek to file offset");
|
||||
}
|
||||
|
||||
// Writing everything to disk directly is probably slightly faster because it balances out the writes,
|
||||
// but this is better than the performance loss from constantly reading the callbacks
|
||||
|
||||
//let mut writer = BufWriter::with_capacity(1024 * 1024, file);
|
||||
|
||||
//copy_to_drop_file_writer(&mut response, &mut file);
|
||||
match io::copy(&mut response, &mut file) {
|
||||
Ok(_) => {},
|
||||
Err(e) => { info!("Copy errored with error {}", e)},
|
||||
}
|
||||
|
||||
let res = hex::encode(file.finish().unwrap().0);
|
||||
if res != ctx.checksum {
|
||||
info!("Checksum failed. Original: {}, Calculated: {} for {}", ctx.checksum, res, ctx.file_name);
|
||||
}
|
||||
|
||||
// stream.flush().unwrap();
|
||||
}
|
||||
|
||||
pub fn copy_to_drop_file_writer(response: &mut Response, writer: &mut DropFileWriter) {
|
||||
loop {
|
||||
info!("Writing to file writer");
|
||||
let mut buf = [0u8; 1024];
|
||||
response.read(&mut buf).unwrap();
|
||||
match writer.write_all(&buf) {
|
||||
Ok(_) => {},
|
||||
Err(e) => {
|
||||
match e.kind() {
|
||||
ErrorKind::Interrupted => {
|
||||
info!("Interrupted");
|
||||
return;
|
||||
}
|
||||
_ => { println!("{}", e); return;}
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,25 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
use std::fs::File;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
pub type DropManifest = HashMap<String, DropChunk>;
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Ord, PartialOrd, Eq, PartialEq)]
|
||||
pub struct DropChunk {
|
||||
pub permissions: usize,
|
||||
pub ids: Vec<String>,
|
||||
pub checksums: Vec<String>,
|
||||
pub lengths: Vec<usize>,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
||||
pub struct DropDownloadContext {
|
||||
pub file_name: String,
|
||||
pub version: String,
|
||||
pub index: usize,
|
||||
pub offset: u64,
|
||||
pub game_id: String,
|
||||
pub path: PathBuf,
|
||||
pub checksum: String
|
||||
}
|
||||
@@ -0,0 +1,5 @@
|
||||
mod manifest;
|
||||
pub mod progress;
|
||||
pub mod download_agent;
|
||||
mod download_logic;
|
||||
pub mod download_commands;
|
||||
@@ -0,0 +1,73 @@
|
||||
use log::info;
|
||||
use rayon::ThreadPoolBuilder;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::Arc;
|
||||
|
||||
pub struct ProgressChecker<T>
|
||||
where
|
||||
T: 'static + Send + Sync,
|
||||
{
|
||||
counter: Arc<AtomicUsize>,
|
||||
f: Arc<Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>>,
|
||||
callback: Arc<AtomicBool>
|
||||
}
|
||||
|
||||
impl<T> ProgressChecker<T>
|
||||
where
|
||||
T: Send + Sync,
|
||||
{
|
||||
pub fn new(
|
||||
f: Box<dyn Fn(T, Arc<AtomicBool>) + Send + Sync + 'static>,
|
||||
counter_reference: Arc<AtomicUsize>,
|
||||
callback: Arc<AtomicBool>
|
||||
) -> Self {
|
||||
Self {
|
||||
f: f.into(),
|
||||
counter: counter_reference,
|
||||
callback
|
||||
}
|
||||
}
|
||||
pub fn run_contexts_sequentially(&self, contexts: Vec<T>) {
|
||||
for context in contexts {
|
||||
(self.f)(context, self.callback.clone());
|
||||
self.counter.fetch_add(1, Ordering::Release);
|
||||
}
|
||||
}
|
||||
pub fn run_contexts_parallel_background(&self, contexts: Vec<T>, max_threads: usize) {
|
||||
let threads = ThreadPoolBuilder::new()
|
||||
// If max_threads == 0, then the limit will be determined
|
||||
// by Rayon's internal RAYON_NUM_THREADS
|
||||
.num_threads(max_threads)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
for context in contexts {
|
||||
let callback = self.callback.clone();
|
||||
let f = self.f.clone();
|
||||
threads.spawn(move || f(context, callback));
|
||||
}
|
||||
}
|
||||
pub fn run_context_parallel(&self, contexts: Vec<T>, max_threads: usize) {
|
||||
let threads = ThreadPoolBuilder::new()
|
||||
.num_threads(max_threads)
|
||||
.build()
|
||||
.unwrap();
|
||||
|
||||
threads.scope(|s| {
|
||||
for context in contexts {
|
||||
let callback = self.callback.clone();
|
||||
let f = self.f.clone();
|
||||
s.spawn(move |_| {info!("Running thread"); f(context, callback)});
|
||||
}
|
||||
});
|
||||
info!("Concluded scope");
|
||||
|
||||
}
|
||||
pub fn get_progress(&self) -> usize {
|
||||
self.counter.load(Ordering::Relaxed)
|
||||
}
|
||||
// I strongly dislike type casting in my own code, so I've shovelled it into here
|
||||
pub fn get_progress_percentage<C: Into<f64>>(&self, capacity: C) -> f64 {
|
||||
(self.get_progress() as f64) / (capacity.into())
|
||||
}
|
||||
}
|
||||
@@ -2,10 +2,13 @@ mod auth;
|
||||
mod db;
|
||||
mod library;
|
||||
mod remote;
|
||||
mod unpacker;
|
||||
mod downloads;
|
||||
#[cfg(test)]
|
||||
mod tests;
|
||||
|
||||
use auth::{auth_initiate, generate_authorization_header, recieve_handshake};
|
||||
use db::{DatabaseInterface, DATA_ROOT_DIR};
|
||||
use downloads::download_commands::{queue_game_download, start_game_downloads, stop_specific_game_download};
|
||||
use env_logger::Env;
|
||||
use http::{header::*, response::Builder as ResponseBuilder};
|
||||
use library::{fetch_game, fetch_library, Game};
|
||||
@@ -15,8 +18,10 @@ use serde::{Deserialize, Serialize};
|
||||
use std::{
|
||||
collections::HashMap, sync::{LazyLock, Mutex}
|
||||
};
|
||||
use std::sync::Arc;
|
||||
use tauri_plugin_deep_link::DeepLinkExt;
|
||||
use crate::db::DatabaseImpls;
|
||||
use crate::downloads::download_agent::{GameDownloadAgent};
|
||||
|
||||
#[derive(Clone, Copy, Serialize)]
|
||||
pub enum AppStatus {
|
||||
@@ -41,6 +46,9 @@ pub struct AppState {
|
||||
status: AppStatus,
|
||||
user: Option<User>,
|
||||
games: HashMap<String, Game>,
|
||||
|
||||
#[serde(skip_serializing)]
|
||||
game_downloads: HashMap<String, Arc<GameDownloadAgent>>
|
||||
}
|
||||
|
||||
#[tauri::command]
|
||||
@@ -60,6 +68,7 @@ fn setup() -> AppState {
|
||||
status: AppStatus::NotConfigured,
|
||||
user: None,
|
||||
games: HashMap::new(),
|
||||
game_downloads: HashMap::new(),
|
||||
};
|
||||
}
|
||||
|
||||
@@ -68,6 +77,7 @@ fn setup() -> AppState {
|
||||
status: auth_result.0,
|
||||
user: auth_result.1,
|
||||
games: HashMap::new(),
|
||||
game_downloads: HashMap::new(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -101,6 +111,10 @@ pub fn run() {
|
||||
// Library
|
||||
fetch_library,
|
||||
fetch_game,
|
||||
// Downloads
|
||||
queue_game_download,
|
||||
start_game_downloads,
|
||||
stop_specific_game_download
|
||||
])
|
||||
.plugin(tauri_plugin_shell::init())
|
||||
.setup(|app| {
|
||||
|
||||
@@ -0,0 +1 @@
|
||||
mod progress_tests;
|
||||
@@ -0,0 +1,23 @@
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize};
|
||||
use crate::downloads::progress::ProgressChecker;
|
||||
|
||||
#[test]
|
||||
fn test_progress_sequentially() {
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let callback = Arc::new(AtomicBool::new(false));
|
||||
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
|
||||
p.run_contexts_sequentially((1..100).collect());
|
||||
println!("Progress: {}", p.get_progress_percentage(100));
|
||||
}
|
||||
#[test]
|
||||
fn test_progress_parallel() {
|
||||
let counter = Arc::new(AtomicUsize::new(0));
|
||||
let callback = Arc::new(AtomicBool::new(false));
|
||||
let p = ProgressChecker::new(Box::new(test_fn), counter.clone(), callback);
|
||||
p.run_contexts_parallel_background((1..100).collect(), 10);
|
||||
}
|
||||
|
||||
fn test_fn(int: usize, callback: Arc<AtomicBool>) {
|
||||
println!("{}", int);
|
||||
}
|
||||
@@ -1,71 +0,0 @@
|
||||
use ciborium::from_reader;
|
||||
use rayon::iter::{IntoParallelIterator, ParallelIterator};
|
||||
use serde::Deserialize;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
fs::{create_dir_all, File},
|
||||
io::{self, BufReader, Error, Seek, Write},
|
||||
path::Path,
|
||||
};
|
||||
|
||||
#[cfg(unix)]
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all="camelCase")]
|
||||
struct ManifestChunk {
|
||||
uuid: String,
|
||||
index: i64,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all="camelCase")]
|
||||
struct ManifestRecord {
|
||||
chunks: Vec<ManifestChunk>,
|
||||
permissions: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all="camelCase")]
|
||||
struct Manifest {
|
||||
record: HashMap<String, ManifestRecord>,
|
||||
}
|
||||
|
||||
pub async fn unpack() -> Result<(), Error> {
|
||||
let chunk_size: u64 = 1024 * 1024 * 16;
|
||||
|
||||
let input = Path::new("/home/decduck/Dev/droplet-output");
|
||||
let output = Path::new("/home/decduck/Dev/droplet-rebuilt");
|
||||
|
||||
let manifest_path = input.join("manifest.drop");
|
||||
let manifest_file_handle = File::open(manifest_path).unwrap();
|
||||
|
||||
let manifest: Manifest = from_reader(manifest_file_handle).unwrap();
|
||||
manifest.record.into_par_iter().for_each(|(key, value)| {
|
||||
let file = output.join(key.clone());
|
||||
create_dir_all(file.parent().unwrap()).unwrap();
|
||||
let mut file_handle = File::create(file).unwrap();
|
||||
|
||||
#[cfg(unix)]
|
||||
{
|
||||
let mut file_permissions = file_handle.metadata().unwrap().permissions();
|
||||
file_permissions.set_mode(value.permissions);
|
||||
file_handle.set_permissions(file_permissions).unwrap();
|
||||
}
|
||||
|
||||
for chunk in value.chunks {
|
||||
let chunk_path = input.join(chunk.uuid + ".bin");
|
||||
let chunk_handle = File::open(chunk_path).unwrap();
|
||||
|
||||
let mut chunk_reader = BufReader::new(chunk_handle);
|
||||
|
||||
let offset = u64::try_from(chunk.index).unwrap() * chunk_size;
|
||||
file_handle.seek(io::SeekFrom::Start(offset)).unwrap();
|
||||
|
||||
io::copy(&mut chunk_reader, &mut file_handle).unwrap();
|
||||
file_handle.flush().unwrap();
|
||||
}
|
||||
});
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user