feat: protobuf communication

This commit is contained in:
DecDuck
2026-02-04 13:44:07 +11:00
parent 34f2a92785
commit aa46a88957
19 changed files with 520 additions and 261 deletions
+2
View File
@@ -1 +1,3 @@
/target
/src/proto/*
!/src/proto/.gitkeep
+211 -14
View File
@@ -8,11 +8,20 @@ version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"cipher",
"cpufeatures",
]
[[package]]
name = "ahash"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8fd72866655d1904d6b0997d0b07ba561047d070fbe29de039031c641b61217"
dependencies = [
"const-random",
]
[[package]]
name = "aho-corasick"
version = "1.1.4"
@@ -240,6 +249,12 @@ dependencies = [
"shlex",
]
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]]
name = "cfg-if"
version = "1.0.4"
@@ -323,6 +338,26 @@ dependencies = [
"windows-sys 0.59.0",
]
[[package]]
name = "const-random"
version = "0.1.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87e00182fe74b066627d63b85fd550ac2998d4b0bd86bfed477a0ae4c7c71359"
dependencies = [
"const-random-macro",
]
[[package]]
name = "const-random-macro"
version = "0.1.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9d839f2a20b0aee515dc581a6172f2321f96cab76c1a38a4c584a194955390e"
dependencies = [
"getrandom 0.2.16",
"once_cell",
"tiny-keccak",
]
[[package]]
name = "cpufeatures"
version = "0.2.17"
@@ -418,15 +453,26 @@ dependencies = [
"cipher",
]
[[package]]
name = "dashmap"
version = "3.11.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f260e2fc850179ef410018660006951c1b55b79e8087e87111a2c388994b9b5"
dependencies = [
"ahash",
"cfg-if 0.1.10",
"num_cpus",
]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"crossbeam-utils",
"hashbrown",
"hashbrown 0.14.5",
"lock_api",
"once_cell",
"parking_lot_core",
@@ -531,6 +577,12 @@ version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
[[package]]
name = "equivalent"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
[[package]]
name = "errno"
version = "0.3.14"
@@ -676,7 +728,7 @@ version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"js-sys",
"libc",
"wasi",
@@ -689,7 +741,7 @@ version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "899def5c37c4fd7b2664648c28120ecec138e4d395b459e5ca34f9cce2dd77fd"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"js-sys",
"libc",
"r-efi",
@@ -703,7 +755,7 @@ version = "2.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ea2d84b969582b4b1864a92dc5d27cd2b77b622a8d79306834f1be5ba20d84b"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"crunchy",
"zerocopy",
]
@@ -714,12 +766,33 @@ version = "0.14.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
[[package]]
name = "hashbrown"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
[[package]]
name = "hermit-abi"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc0fef456e4baa96da950455cd02c081ca953b141298e41db3fc7e36b1da849c"
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "home"
version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d"
dependencies = [
"windows-sys 0.61.2",
]
[[package]]
name = "http"
version = "1.4.0"
@@ -939,6 +1012,16 @@ dependencies = [
"icu_properties",
]
[[package]]
name = "indexmap"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017"
dependencies = [
"equivalent",
"hashbrown 0.16.1",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -1007,6 +1090,12 @@ version = "0.2.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de"
[[package]]
name = "linux-raw-sys"
version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab"
[[package]]
name = "linux-raw-sys"
version = "0.11.0"
@@ -1119,6 +1208,16 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_cpus"
version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91df4bbde75afed763b708b7eee1e8e7651e02d97f6d5dd763e89367e957b23b"
dependencies = [
"hermit-abi",
"libc",
]
[[package]]
name = "oid-registry"
version = "0.7.1"
@@ -1165,7 +1264,7 @@ version = "0.9.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"libc",
"redox_syscall",
"smallvec",
@@ -1261,6 +1360,57 @@ dependencies = [
"unicode-ident",
]
[[package]]
name = "protobuf"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d65a1d4ddae7d8b5de68153b48f6aa3bba8cb002b243dbdbc55a5afbc98f99f4"
dependencies = [
"once_cell",
"protobuf-support",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-codegen"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d3976825c0014bbd2f3b34f0001876604fe87e0c86cd8fa54251530f1544ace"
dependencies = [
"anyhow",
"once_cell",
"protobuf",
"protobuf-parse",
"regex",
"tempfile",
"thiserror 1.0.69",
]
[[package]]
name = "protobuf-parse"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973"
dependencies = [
"anyhow",
"indexmap",
"log",
"protobuf",
"protobuf-support",
"tempfile",
"thiserror 1.0.69",
"which",
]
[[package]]
name = "protobuf-support"
version = "3.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3e36c2f31e0a47f9280fb347ef5e461ffcd2c52dd520d8e216b52f93b0b0d7d6"
dependencies = [
"thiserror 1.0.69",
]
[[package]]
name = "quinn"
version = "0.11.9"
@@ -1477,7 +1627,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7"
dependencies = [
"cc",
"cfg-if",
"cfg-if 1.0.4",
"getrandom 0.2.16",
"libc",
"untrusted",
@@ -1508,6 +1658,19 @@ dependencies = [
"nom",
]
[[package]]
name = "rustix"
version = "0.38.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154"
dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys 0.4.15",
"windows-sys 0.59.0",
]
[[package]]
name = "rustix"
version = "1.1.2"
@@ -1517,7 +1680,7 @@ dependencies = [
"bitflags",
"errno",
"libc",
"linux-raw-sys",
"linux-raw-sys 0.11.0",
"windows-sys 0.61.2",
]
@@ -1655,7 +1818,7 @@ version = "0.10.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"cpufeatures",
"digest",
]
@@ -1760,7 +1923,7 @@ dependencies = [
"fastrand",
"getrandom 0.3.4",
"once_cell",
"rustix",
"rustix 1.1.2",
"windows-sys 0.61.2",
]
@@ -1835,6 +1998,15 @@ dependencies = [
"time-core",
]
[[package]]
name = "tiny-keccak"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2c9d3793400a45f954c52e73d068316d76b6f4e36977e3fcebb13a2721e80237"
dependencies = [
"crunchy",
]
[[package]]
name = "tinystr"
version = "0.8.2"
@@ -1931,12 +2103,14 @@ dependencies = [
"bytes",
"criterion",
"ctr",
"dashmap",
"dashmap 6.1.0",
"droplet-rs",
"file_open_limit",
"futures-util",
"log",
"pin-project-lite",
"protobuf",
"protobuf-codegen",
"rand",
"reqwest",
"serde",
@@ -1946,6 +2120,7 @@ dependencies = [
"tokio",
"tokio-util",
"url",
"waitmap",
]
[[package]]
@@ -2073,6 +2248,16 @@ version = "0.9.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
[[package]]
name = "waitmap"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28491611b6b9a0b9f027be139a4be792b13a20780100dd8b054d44dbf596d52b"
dependencies = [
"dashmap 3.11.10",
"smallvec",
]
[[package]]
name = "walkdir"
version = "2.5.0"
@@ -2113,7 +2298,7 @@ version = "0.2.106"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0d759f433fa64a2d763d1340820e46e111a7a5ab75f993d1852d70b03dbb80fd"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
@@ -2126,7 +2311,7 @@ version = "0.4.56"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "836d9622d604feee9e5de25ac10e3ea5f2d65b41eac0d9ce72eb5deae707ce7c"
dependencies = [
"cfg-if",
"cfg-if 1.0.4",
"js-sys",
"once_cell",
"wasm-bindgen",
@@ -2194,6 +2379,18 @@ dependencies = [
"rustls-pki-types",
]
[[package]]
name = "which"
version = "4.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "87ba24419a2078cd2b0f2ede2691b6c66d8e47836da3b6db8265ebad47afbfc7"
dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.44",
]
[[package]]
name = "winapi"
version = "0.3.9"
+5
View File
@@ -36,6 +36,8 @@ aes = "0.8.4"
bytes = "*"
file_open_limit = "0.0.5"
pin-project-lite = "0.2.16"
protobuf = "3.7.2"
waitmap = "1.1.0"
[lints.clippy]
pedantic = { level = "warn", priority = -1 }
@@ -59,3 +61,6 @@ harness = false
criterion = { version = "0.8.0", features = ["async", "async_tokio"] }
rand = "0.9.2"
tempfile = "3.23.0"
[build-dependencies]
protobuf-codegen = "3.7.2"
+25
View File
@@ -0,0 +1,25 @@
use std::fs::{self, read_dir};
use protobuf_codegen::Codegen;
const OUT_DIR: &'static str = "./src/proto/";
fn main() {
let files = read_dir("./proto").unwrap();
let files = files.map(|v| format!("proto/{}", v.unwrap().file_name().into_string().unwrap()));
read_dir(OUT_DIR).unwrap().into_iter().for_each(|v| {
if let Ok(entry) = v {
if entry.file_name().to_str().unwrap().ends_with(".rs") {
fs::remove_file(entry.path()).unwrap();
}
}
});
Codegen::new()
.inputs(files)
.include("proto")
.out_dir(OUT_DIR)
.run()
.unwrap();
}
+24
View File
@@ -0,0 +1,24 @@
syntax = "proto3";
enum ResponseType {
ERROR = 0;
SERVER_GAMES_RESPONSE = 1;
VERSION_RESPONSE = 2;
}
message Response {
string message_id = 1;
ResponseType type = 2;
bytes data = 3;
}
enum QueryType {
SERVER_GAMES_QUERY = 0;
VERSION_QUERY = 1;
}
message Query {
string message_id = 1;
QueryType type = 2;
bytes data = 3;
}
+14
View File
@@ -0,0 +1,14 @@
syntax = "proto3";
message ServerGamesQuery {}
message ServerGamesResponse {
message SkeletonGame {
message SkeletonVersion {
string version_id = 1;
}
string id = 1;
repeated SkeletonVersion versions = 2;
}
repeated SkeletonGame games = 1;
}
+39
View File
@@ -0,0 +1,39 @@
syntax = "proto3";
message VersionQuery {
string version_id = 1;
}
message VersionResponse {
message Manifest {
string version = 1;
message ChunkData {
message FileEntry {
string filename = 1;
uint64 start = 2;
uint64 length = 3;
fixed32 permissions = 4;
}
repeated FileEntry files = 1;
string checksum = 2;
bytes iv = 3;
}
map<string, ChunkData> chunks = 2;
uint64 size = 3;
bytes key = 4;
}
Manifest manifest = 1;
message LibrarySource {
enum LibraryBackend {
FILESYSTEM = 0;
FLAT_FILESYSTEM = 1;
}
string options = 1; /// JSON
string id = 2;
LibraryBackend backend = 3;
}
LibrarySource source = 2;
string library_path = 3;
string version_path = 4;
}
+37
View File
@@ -0,0 +1,37 @@
use crate::proto::{self, version::version_response::Manifest};
fn fixed_length<T, const N: usize>(v: Vec<T>) -> [T; N] {
v.try_into()
.unwrap_or_else(|v: Vec<T>| panic!("Expected a Vec of length {} but it was {}", N, v.len()))
}
pub fn convert_protobuf_manifest(source: Manifest) -> droplet_rs::manifest::Manifest {
droplet_rs::manifest::Manifest {
version: source.version,
chunks: source
.chunks
.into_iter()
.map(|(id, chunk_data)| {
(
id,
droplet_rs::manifest::ChunkData {
files: chunk_data
.files
.into_iter()
.map(|file_entry| droplet_rs::manifest::FileEntry {
filename: file_entry.filename,
start: file_entry.start.try_into().unwrap(),
length: file_entry.length.try_into().unwrap(),
permissions: file_entry.permissions,
})
.collect(),
checksum: chunk_data.checksum,
iv: fixed_length(chunk_data.iv),
},
)
})
.collect(),
size: source.size,
key: fixed_length(source.key),
}
}
+16 -18
View File
@@ -1,16 +1,18 @@
use std::{path::PathBuf, time::Instant};
use anyhow::anyhow;
use droplet_rs::{
manifest::Manifest,
versions::{create_backend_constructor, types::VersionBackend},
};
use log::{info, warn};
use log::warn;
use reqwest::StatusCode;
use serde_json::Value;
use crate::{
remote::{LibraryBackend, VersionResponseBody, fetch_version_data},
state::AppInitData,
conversions::convert_protobuf_manifest,
proto::version::{VersionResponse, version_response::library_source::LibraryBackend},
server::download::fetch_version_data,
state::AppState,
util::ErrorOption,
};
@@ -29,16 +31,16 @@ impl DownloadContext {
}
pub async fn create_download_context(
init_data: &AppInitData,
app_state: &AppState,
game_id: String,
version_name: String,
) -> Result<DownloadContext, ErrorOption> {
let version_data = fetch_version_data(init_data, game_id, version_name.clone()).await?;
let version_data = fetch_version_data(app_state, game_id, version_name.clone()).await?;
let backend = create_backend(&version_data)?;
let download_context = DownloadContext {
manifest: version_data.manifest,
manifest: convert_protobuf_manifest(version_data.manifest.unwrap()),
backend,
last_access: Instant::now(),
};
@@ -47,21 +49,17 @@ pub async fn create_download_context(
}
fn create_backend(
version_data: &VersionResponseBody,
version_data: &VersionResponse,
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode> {
let base_path = version_data
.library
.options
.get("baseDir")
.unwrap()
.as_str()
.unwrap();
let base_path = serde_json::from_str::<Value>(&version_data.source.options)
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
let base_path = base_path.get("baseDir").unwrap().as_str().unwrap();
let version_path = PathBuf::from(base_path);
let version_path = version_path.join(version_data.library_path.clone());
let version_path = match version_data.library.backend {
LibraryBackend::Filesystem => version_path.join(version_data.version_path.clone()),
LibraryBackend::FlatFilesystem => version_path,
let version_path = match version_data.source.backend.unwrap() {
LibraryBackend::FILESYSTEM => version_path.join(version_data.version_path.clone()),
LibraryBackend::FLAT_FILESYSTEM => version_path,
};
if !version_path.exists() {
+10 -16
View File
@@ -7,7 +7,13 @@ use std::{
task::Poll,
};
use axum::{Json, body::Body, extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse};
use axum::{
Json,
body::Body,
extract::State,
http::{HeaderMap, HeaderValue},
response::IntoResponse,
};
use bytes::BufMut;
use reqwest::{StatusCode, header::CONTENT_TYPE};
use serde::{Deserialize, Serialize};
@@ -16,13 +22,9 @@ use std::io::Write;
use tokio::io::AsyncRead;
use tokio_util::io::ReaderStream;
use crate::{remote::fetch_instance_games, state::AppState};
use crate::{server::download::fetch_instance_games, state::AppState};
pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
let initialised = state.token.initialized();
if !initialised {
return StatusCode::SERVICE_UNAVAILABLE;
}
StatusCode::OK
}
@@ -99,16 +101,8 @@ struct Manifest {
content: HashMap<String, Vec<GameData>>,
}
pub async fn manifest(
State(state): State<Arc<AppState>>,
) -> Result<impl IntoResponse, StatusCode> {
let games = fetch_instance_games(
state
.token
.get()
.ok_or(StatusCode::from_u16(503).unwrap())?,
)
.await?;
pub async fn manifest(State(state): State<Arc<AppState>>) -> Result<impl IntoResponse, StatusCode> {
let games = fetch_instance_games(&state).await?;
let mut content = HashMap::new();
for game in games {
+5 -5
View File
@@ -1,13 +1,13 @@
use tokio::sync::Semaphore;
mod download;
pub mod download;
pub mod serve;
pub mod handlers;
mod remote;
pub mod state;
mod token;
mod util;
pub mod util;
pub mod proto;
pub mod conversions;
pub mod server;
pub use download::DownloadContext;
pub use token::set_token;
static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1);
+7 -7
View File
@@ -1,19 +1,18 @@
use std::{
env::{self, set_current_dir},
env::set_current_dir,
sync::Arc,
time::{Duration, Instant},
};
use axum::{
Router, handler,
Router,
routing::{get, post},
};
use dashmap::DashMap;
use log::info;
use simple_logger::SimpleLogger;
use tokio::{runtime::Handle, spawn, sync::OnceCell, time};
use torrential::{handlers, serve, set_token, state::AppState};
use url::Url;
use tokio::{runtime::Handle, spawn, time};
use torrential::{handlers, serve, server::create_drop_server, state::AppState};
const CONTEXT_TTL: u64 = 10 * 60;
@@ -29,9 +28,11 @@ async fn main() {
let metrics = Handle::current().metrics();
info!("using {} threads", metrics.num_workers());
let server = create_drop_server().await.expect("failed to connect to drop server");
let shared_state = Arc::new(AppState {
token: OnceCell::new(),
context_cache: DashMap::new(),
server: server,
});
let interval_shared_state = shared_state.clone();
@@ -74,7 +75,6 @@ fn setup_app(shared_state: Arc<AppState>) -> Router {
)
.route("/api/v1/depot/manifest.json", get(handlers::manifest))
.route("/api/v1/depot/speedtest", get(handlers::speedtest))
.route("/key", post(set_token))
.route("/healthcheck", get(handlers::healthcheck))
.route("/invalidate", post(handlers::invalidate))
.with_state(shared_state)
View File
-134
View File
@@ -1,134 +0,0 @@
use std::{env, sync::LazyLock};
use anyhow::{Result, anyhow};
use async_trait::async_trait;
use droplet_rs::manifest::Manifest;
use log::info;
use reqwest::{Client, ClientBuilder, StatusCode};
use serde::{Deserialize, Serialize};
use url::Url;
use crate::{state::AppInitData, util::ErrorOption};
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
ClientBuilder::new()
.build()
.expect("failed to build client")
});
static REMOTE_URL: LazyLock<Url> = LazyLock::new(|| {
let user_provided = env::var("DROP_SERVER_URL");
let url = Url::parse(
user_provided
.as_ref()
.map_or("http://localhost:3000", |v| v),
)
.expect("failed to parse URL");
info!("using Drop server url {}", url);
url
});
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct VersionResponseBody {
pub manifest: Manifest,
pub library: LibrarySource,
pub library_path: String,
pub version_path: String,
}
#[derive(Serialize)]
pub struct VersionQuery {
game: String,
version: String,
}
#[derive(Deserialize, Debug)]
#[non_exhaustive]
pub enum LibraryBackend {
Filesystem,
FlatFilesystem,
}
#[derive(Deserialize)]
pub struct LibrarySource {
pub options: serde_json::Value,
pub id: String,
pub backend: LibraryBackend,
}
pub async fn fetch_version_data(
init_data: &AppInitData,
game_id: String,
version_id: String,
) -> Result<VersionResponseBody, ErrorOption> {
let version_data_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/depot/torrential/manifest")?)
.query(&VersionQuery {
game: game_id,
version: version_id,
})
.header("Authorization", format!("Bearer {}", init_data.key))
.send()
.await?;
if !version_data_response.status().is_success() {
if version_data_response.status() == StatusCode::BAD_REQUEST {
return Err(StatusCode::NOT_FOUND.into());
}
return Err(anyhow!(
"Fetching context failed with non-success code: {}, {}",
version_data_response.status(),
version_data_response
.text()
.await
.unwrap_or("(failed to read body)".to_owned())
)
.into());
}
let version_data: VersionResponseBody = version_data_response.json().await?;
Ok(version_data)
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SkeletonVersion {
pub version_id: String,
}
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SkeletonGame {
pub id: String,
pub versions: Vec<SkeletonVersion>,
}
pub async fn fetch_instance_games(
init_data: &AppInitData,
) -> Result<Vec<SkeletonGame>, ErrorOption> {
let context_response = CLIENT
.get(REMOTE_URL.join("/api/v1/admin/depot/torrential/versions")?)
.header("Authorization", format!("Bearer {}", init_data.key))
.send()
.await?;
if !context_response.status().is_success() {
return Err(anyhow!(
"Fetching instance games failed with non-success code: {}, {}",
context_response.status(),
context_response
.text()
.await
.unwrap_or("(failed to read body)".to_owned())
)
.into());
}
let games: Vec<SkeletonGame> = context_response.json().await?;
Ok(games)
}
+1 -3
View File
@@ -171,7 +171,6 @@ async fn get_or_create_context<'a>(
game_id: String,
version_name: String,
) -> Result<RefMut<'a, (String, String), DownloadContext>, StatusCode> {
let initialisation_data = state.token.get().ok_or(StatusCode::SERVICE_UNAVAILABLE)?;
let key = (game_id.clone(), version_name.clone());
if let Some(context) = context_cache.get_mut(&key) {
@@ -185,8 +184,7 @@ async fn get_or_create_context<'a>(
} else {
info!("generating context for {}...", game_id);
let context_result =
create_download_context(initialisation_data, game_id.clone(), version_name.clone())
.await?;
create_download_context(state, game_id.clone(), version_name.clone()).await?;
state.context_cache.insert(key.clone(), context_result);
+15
View File
@@ -0,0 +1,15 @@
use crate::{proto::{manifest::server_games_response::SkeletonGame, version::VersionResponse}, state::{AppState}, util::ErrorOption};
pub async fn fetch_version_data(
app_state: &AppState,
game_id: String,
version_id: String,
) -> Result<VersionResponse, ErrorOption> {
unreachable!();
}
pub async fn fetch_instance_games(
app_state: &AppState,
) -> Result<Vec<SkeletonGame>, ErrorOption> {
unreachable!()
}
+107
View File
@@ -0,0 +1,107 @@
use std::sync::{Arc, Mutex};
use anyhow::anyhow;
use protobuf::Message;
use tokio::{
io::{AsyncReadExt, AsyncWriteExt as _, BufReader},
net::{
TcpListener,
tcp::{OwnedReadHalf, OwnedWriteHalf},
},
spawn,
};
use waitmap::WaitMap;
use crate::proto::core::Response;
pub mod download;
pub struct DropServer {
write_stream: Mutex<OwnedWriteHalf>,
waitmap: WaitMap<String, Response>,
}
impl DropServer {
async fn recieve_subroutine(myself: Arc<DropServer>, read_stream: OwnedReadHalf) -> ! {
let mut buffered_reader = BufReader::new(read_stream);
loop {
let mut length_buffer: [u8; 8] = [0; 8];
buffered_reader
.read_exact(&mut length_buffer)
.await
.expect("failed to read from internal pipe");
let length = usize::from_le_bytes(length_buffer);
let mut buffer = Vec::with_capacity(length);
buffered_reader
.read_exact(&mut buffer)
.await
.expect("failed to read from internal pipe");
let message =
Response::parse_from_bytes(&buffer).expect("response didn't deserialize correctly");
myself.waitmap.insert(message.message_id.clone(), message);
}
}
async fn wait_for_message_id<T>(&self, message_id: &str) -> Result<T, anyhow::Error>
where
T: protobuf::Message,
{
let message = self
.waitmap
.wait(message_id.clone())
.await
.ok_or(anyhow!("no response returned for value"))?;
let message = message.value();
match message.type_.unwrap() {
crate::proto::core::ResponseType::ERROR => {
return Err(anyhow!(String::from_utf8(message.data.clone()).unwrap()));
}
_ => {
let response = T::parse_from_bytes(&message.data)?;
return Ok(response);
}
}
}
async fn send_message<T>(&self, message: T) -> Result<(), anyhow::Error>
where
T: protobuf::Message,
{
let mut buf = Vec::new();
message.write_to_vec(&mut buf)?;
{
let mut mutex_lock = self
.write_stream
.lock()
.expect("failed to lock send stream");
mutex_lock.write(&buf.len().to_le_bytes()).await?;
mutex_lock.write_all(&buf).await?;
};
Ok(())
}
}
pub async fn create_drop_server() -> Result<Arc<DropServer>, anyhow::Error> {
let server = TcpListener::bind("127.0.0.1:33148").await?;
let (drop_stream, _) = server.accept().await?;
let (read, write) = drop_stream.into_split();
let client = Arc::new(DropServer {
write_stream: Mutex::new(write),
waitmap: WaitMap::new(),
});
spawn(DropServer::recieve_subroutine(client.clone(), read));
Ok(client)
}
+2 -9
View File
@@ -3,16 +3,9 @@ use std::{collections::HashMap, path::PathBuf, sync::Arc};
use dashmap::DashMap;
use tokio::sync::{OnceCell, Semaphore};
use crate::
DownloadContext
;
use crate::{DownloadContext, server::DropServer};
pub struct AppState {
pub token: OnceCell<AppInitData>,
pub context_cache: DashMap<(String, String), DownloadContext>,
pub server: Arc<DropServer>,
}
#[derive(Debug)]
pub struct AppInitData {
pub key: String,
}
-55
View File
@@ -1,55 +0,0 @@
use std::sync::Arc;
use axum::{Json, extract::State};
use log::{error, info};
use reqwest::StatusCode;
use serde::Deserialize;
use crate::state::{AppInitData, AppState};
#[derive(Deserialize)]
pub struct TokenPayload {
key: String,
}
pub async fn set_token(
State(state): State<Arc<AppState>>,
Json(payload): Json<TokenPayload>,
) -> Result<StatusCode, StatusCode> {
if check_token_exists(&state, &payload) {
return Ok(StatusCode::OK);
}
let key = payload.key;
set_depot_key(&state, key)?;
info!("connected to drop server successfully");
Ok(StatusCode::OK)
}
fn check_token_exists(state: &Arc<AppState>, payload: &TokenPayload) -> bool {
if let Some(existing_data) = state.token.get() {
assert!(
*existing_data.key == payload.key,
"already set up but provided with a different token"
);
return true;
}
false
}
fn set_depot_key(
state: &Arc<AppState>,
key: String
) -> Result<(), StatusCode> {
state
.token
.set(AppInitData { key })
.map_err(|err| {
error!("failed to set token: {err:?}");
StatusCode::INTERNAL_SERVER_ERROR
})?;
Ok(())
}