feat: depot API
This commit is contained in:
+39
-56
@@ -1,16 +1,21 @@
|
||||
use std::{collections::HashMap, hash::RandomState, time::Instant};
|
||||
use std::{path::PathBuf, time::Instant};
|
||||
|
||||
use droplet_rs::versions::{create_backend_constructor, types::VersionBackend};
|
||||
use anyhow::anyhow;
|
||||
use droplet_rs::{
|
||||
manifest::Manifest,
|
||||
versions::{create_backend_constructor, types::VersionBackend},
|
||||
};
|
||||
use log::{info, warn};
|
||||
use reqwest::StatusCode;
|
||||
|
||||
use crate::{
|
||||
remote::{ContextResponseBody, LibraryBackend, ContextProvider},
|
||||
remote::{LibraryBackend, VersionResponseBody, fetch_version_data},
|
||||
state::AppInitData,
|
||||
util::ErrorOption,
|
||||
};
|
||||
|
||||
pub struct DownloadContext {
|
||||
pub(crate) chunk_lookup_table: HashMap<String, (String, usize, usize)>,
|
||||
pub(crate) manifest: Manifest,
|
||||
pub(crate) backend: Box<dyn VersionBackend + Send + Sync + 'static>,
|
||||
last_access: Instant,
|
||||
}
|
||||
@@ -24,33 +29,16 @@ impl DownloadContext {
|
||||
}
|
||||
|
||||
pub async fn create_download_context(
|
||||
metadata_provider: &dyn ContextProvider,
|
||||
backend_factory: &dyn BackendFactory,
|
||||
init_data: &AppInitData,
|
||||
game_id: String,
|
||||
version_name: String,
|
||||
) -> Result<DownloadContext, ErrorOption> {
|
||||
let context = metadata_provider
|
||||
.fetch_context(init_data.token(), game_id, version_name.clone())
|
||||
.await?;
|
||||
let version_data = fetch_version_data(init_data, game_id, version_name.clone()).await?;
|
||||
|
||||
let backend = backend_factory.create_backend(init_data, &context, &version_name)?;
|
||||
|
||||
let mut chunk_lookup_table = HashMap::with_capacity_and_hasher(
|
||||
context.manifest.values().map(|v| v.ids.len()).sum(),
|
||||
RandomState::default(),
|
||||
);
|
||||
|
||||
for (path, file_chunks) in context.manifest {
|
||||
let mut start = 0;
|
||||
for (chunk, length) in file_chunks.ids.into_iter().zip(file_chunks.lengths) {
|
||||
chunk_lookup_table.insert(chunk, (path.clone(), start, start + length));
|
||||
start += length;
|
||||
}
|
||||
}
|
||||
let backend = create_backend(&version_data)?;
|
||||
|
||||
let download_context = DownloadContext {
|
||||
chunk_lookup_table,
|
||||
manifest: version_data.manifest,
|
||||
backend,
|
||||
last_access: Instant::now(),
|
||||
};
|
||||
@@ -58,39 +46,34 @@ pub async fn create_download_context(
|
||||
Ok(download_context)
|
||||
}
|
||||
|
||||
pub trait BackendFactory: Send + Sync {
|
||||
fn create_backend(
|
||||
&self,
|
||||
init_data: &AppInitData,
|
||||
context: &ContextResponseBody,
|
||||
version_name: &String,
|
||||
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode>;
|
||||
}
|
||||
fn create_backend(
|
||||
version_data: &VersionResponseBody,
|
||||
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode> {
|
||||
let base_path = version_data
|
||||
.library
|
||||
.options
|
||||
.get("baseDir")
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap();
|
||||
|
||||
pub struct DropBackendFactory;
|
||||
impl BackendFactory for DropBackendFactory {
|
||||
fn create_backend(
|
||||
&self,
|
||||
init_data: &AppInitData,
|
||||
context: &ContextResponseBody,
|
||||
version_name: &String,
|
||||
) -> Result<Box<dyn VersionBackend + Send + Sync>, StatusCode> {
|
||||
let (version_path, backend) = init_data
|
||||
.libraries()
|
||||
.get(&context.library_id)
|
||||
.ok_or(StatusCode::NOT_FOUND)?;
|
||||
let version_path = PathBuf::from(base_path);
|
||||
let version_path = version_path.join(version_data.library_path.clone());
|
||||
let version_path = match version_data.library.backend {
|
||||
LibraryBackend::Filesystem => version_path.join(version_data.version_path.clone()),
|
||||
LibraryBackend::FlatFilesystem => version_path,
|
||||
};
|
||||
|
||||
let version_path = version_path.join(&context.library_path);
|
||||
let version_path = match backend {
|
||||
LibraryBackend::Filesystem => version_path.join(version_name),
|
||||
LibraryBackend::FlatFilesystem => version_path,
|
||||
};
|
||||
|
||||
let backend =
|
||||
create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
// TODO: Not eat this error
|
||||
let backend = backend().map_err(|_e| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(backend)
|
||||
if !version_path.exists() {
|
||||
warn!("{} path doesn't exist for version", version_path.display());
|
||||
return Err(StatusCode::INTERNAL_SERVER_ERROR);
|
||||
}
|
||||
|
||||
let backend =
|
||||
create_backend_constructor(&version_path).ok_or(StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
|
||||
let backend = backend()
|
||||
.inspect_err(|err| warn!("failed to create version backend: {:?}", err))
|
||||
.map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?;
|
||||
Ok(backend)
|
||||
}
|
||||
|
||||
+110
-8
@@ -1,10 +1,22 @@
|
||||
use std::sync::Arc;
|
||||
use std::{
|
||||
collections::HashMap,
|
||||
sync::{
|
||||
Arc,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
task::Poll,
|
||||
};
|
||||
|
||||
use axum::{Json, extract::State};
|
||||
use reqwest::StatusCode;
|
||||
use serde::Deserialize;
|
||||
use axum::{Json, body::Body, extract::State, http::{HeaderMap, HeaderValue}, response::IntoResponse};
|
||||
use bytes::BufMut;
|
||||
use reqwest::{StatusCode, header::CONTENT_TYPE};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::json;
|
||||
use std::io::Write;
|
||||
use tokio::io::AsyncRead;
|
||||
use tokio_util::io::ReaderStream;
|
||||
|
||||
use crate::state::AppState;
|
||||
use crate::{remote::fetch_instance_games, state::AppState};
|
||||
|
||||
pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
|
||||
let initialised = state.token.initialized();
|
||||
@@ -16,14 +28,104 @@ pub async fn healthcheck(State(state): State<Arc<AppState>>) -> StatusCode {
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct InvalidateBody {
|
||||
game_id: String,
|
||||
version_name: String,
|
||||
game: String,
|
||||
version: String,
|
||||
}
|
||||
|
||||
pub async fn invalidate(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Json(payload): Json<InvalidateBody>,
|
||||
) -> StatusCode {
|
||||
state.context_cache.remove(&(payload.game_id, payload.version_name));
|
||||
state.context_cache.remove(&(payload.game, payload.version));
|
||||
StatusCode::OK
|
||||
}
|
||||
|
||||
struct SpeedtestStream {
|
||||
remaining: usize,
|
||||
}
|
||||
|
||||
impl SpeedtestStream {
|
||||
pub fn new() -> Self {
|
||||
SpeedtestStream {
|
||||
remaining: 1024 * 1024 * 50,
|
||||
}
|
||||
}
|
||||
fn content_length(&self) -> usize {
|
||||
self.remaining
|
||||
}
|
||||
}
|
||||
const ZERO: [u8; 1024] = [0u8; _];
|
||||
impl AsyncRead for SpeedtestStream {
|
||||
fn poll_read(
|
||||
mut self: std::pin::Pin<&mut Self>,
|
||||
cx: &mut std::task::Context<'_>,
|
||||
buf: &mut tokio::io::ReadBuf<'_>,
|
||||
) -> Poll<std::io::Result<()>> {
|
||||
if self.remaining > 0 {
|
||||
let mut writer = buf.writer();
|
||||
|
||||
let amount = writer.write(&ZERO);
|
||||
match amount {
|
||||
Ok(amount) => self.remaining -= amount,
|
||||
Err(err) => return Poll::Ready(Err(err)),
|
||||
};
|
||||
};
|
||||
return Poll::Ready(Ok(()));
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn speedtest() -> Result<impl IntoResponse, StatusCode> {
|
||||
let speedtest = SpeedtestStream::new();
|
||||
let ct = speedtest.content_length();
|
||||
let speedtest_stream = ReaderStream::new(speedtest);
|
||||
let body = Body::from_stream(speedtest_stream);
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
|
||||
headers.insert("Content-Length", ct.into());
|
||||
|
||||
Ok((headers, body))
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
struct GameData {
|
||||
version_id: String,
|
||||
compression: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
struct Manifest {
|
||||
content: HashMap<String, Vec<GameData>>,
|
||||
}
|
||||
|
||||
pub async fn manifest(
|
||||
State(state): State<Arc<AppState>>,
|
||||
) -> Result<impl IntoResponse, StatusCode> {
|
||||
let games = fetch_instance_games(
|
||||
state
|
||||
.token
|
||||
.get()
|
||||
.ok_or(StatusCode::from_u16(503).unwrap())?,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let mut content = HashMap::new();
|
||||
for game in games {
|
||||
content.insert(
|
||||
game.id,
|
||||
game.versions
|
||||
.into_iter()
|
||||
.map(|v| GameData {
|
||||
version_id: v.version_id,
|
||||
compression: "none".to_owned(),
|
||||
})
|
||||
.collect::<Vec<GameData>>(),
|
||||
);
|
||||
}
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
|
||||
|
||||
Ok((headers, json!(Manifest { content }).to_string()))
|
||||
}
|
||||
|
||||
@@ -2,17 +2,12 @@ use tokio::sync::Semaphore;
|
||||
mod download;
|
||||
pub mod serve;
|
||||
pub mod handlers;
|
||||
mod manifest;
|
||||
mod remote;
|
||||
pub mod state;
|
||||
mod token;
|
||||
mod util;
|
||||
|
||||
pub use download::DownloadContext;
|
||||
pub use download::{BackendFactory, DropBackendFactory};
|
||||
pub use remote::{
|
||||
DropLibraryProvider, DropContextProvider, LibraryConfigurationProvider, ContextProvider,
|
||||
};
|
||||
pub use token::set_token;
|
||||
|
||||
static GLOBAL_CONTEXT_SEMAPHORE: Semaphore = Semaphore::const_new(1);
|
||||
|
||||
+8
-25
@@ -4,16 +4,14 @@ use std::{
|
||||
};
|
||||
|
||||
use axum::{
|
||||
Router,
|
||||
Router, handler,
|
||||
routing::{get, post},
|
||||
};
|
||||
use dashmap::DashMap;
|
||||
use log::info;
|
||||
use simple_logger::SimpleLogger;
|
||||
use tokio::{runtime::Handle, sync::OnceCell};
|
||||
use torrential::{
|
||||
DropBackendFactory, DropContextProvider, DropLibraryProvider, handlers, serve, set_token, state::AppState
|
||||
};
|
||||
use torrential::{handlers, serve, set_token, state::AppState};
|
||||
use url::Url;
|
||||
|
||||
#[tokio::main]
|
||||
@@ -21,21 +19,16 @@ async fn main() {
|
||||
initialise_logger();
|
||||
|
||||
if let Ok(working_directory) = std::env::var("WORKING_DIRECTORY") {
|
||||
info!("moving to working directory {}", working_directory);
|
||||
set_current_dir(working_directory).expect("failed to change working directory");
|
||||
}
|
||||
|
||||
let metrics = Handle::current().metrics();
|
||||
info!("using {} threads", metrics.num_workers());
|
||||
|
||||
let remote_url = get_remote_url();
|
||||
|
||||
let shared_state = Arc::new(AppState {
|
||||
token: OnceCell::new(),
|
||||
context_cache: DashMap::new(),
|
||||
|
||||
metadata_provider: Arc::new(DropContextProvider::new(remote_url.clone())),
|
||||
backend_factory: Arc::new(DropBackendFactory),
|
||||
library_provider: Arc::new(DropLibraryProvider::new(remote_url)),
|
||||
});
|
||||
|
||||
let app = setup_app(shared_state);
|
||||
@@ -46,12 +39,14 @@ async fn main() {
|
||||
fn setup_app(shared_state: Arc<AppState>) -> Router {
|
||||
Router::new()
|
||||
.route(
|
||||
"/api/v1/depot/{game_id}/{version_name}/{*chunk_ids}",
|
||||
"/api/v1/depot/content/{game_id}/{version_name}/{chunk_id}",
|
||||
get(serve::serve_file),
|
||||
)
|
||||
.route("/token", post(set_token))
|
||||
.route("/api/v1/depot/manifest.json", get(handlers::manifest))
|
||||
.route("/api/v1/depot/speedtest", get(handlers::speedtest))
|
||||
.route("/key", post(set_token))
|
||||
.route("/healthcheck", get(handlers::healthcheck))
|
||||
.route("/invalid", post(handlers::invalidate))
|
||||
.route("/invalidate", post(handlers::invalidate))
|
||||
.with_state(shared_state)
|
||||
}
|
||||
|
||||
@@ -67,15 +62,3 @@ fn initialise_logger() {
|
||||
.init()
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
fn get_remote_url() -> Url {
|
||||
let user_provided = env::var("DROP_SERVER_URL");
|
||||
let url = Url::parse(
|
||||
user_provided
|
||||
.as_ref()
|
||||
.map_or("http://localhost:3000", |v| v),
|
||||
)
|
||||
.expect("failed to parse URL");
|
||||
info!("using Drop server url {url}");
|
||||
url
|
||||
}
|
||||
|
||||
@@ -1,11 +0,0 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use serde::Deserialize;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct DropChunk {
|
||||
pub ids: Vec<String>,
|
||||
pub lengths: Vec<usize>,
|
||||
}
|
||||
|
||||
pub type DropletManifest = HashMap<String, DropChunk>;
|
||||
+93
-102
@@ -1,21 +1,44 @@
|
||||
use std::{env, sync::LazyLock};
|
||||
|
||||
use anyhow::{Result, anyhow};
|
||||
use async_trait::async_trait;
|
||||
use reqwest::StatusCode;
|
||||
use droplet_rs::manifest::Manifest;
|
||||
use log::info;
|
||||
use reqwest::{Client, ClientBuilder, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
use crate::{manifest::DropletManifest, util::ErrorOption};
|
||||
use crate::{state::AppInitData, util::ErrorOption};
|
||||
|
||||
static CLIENT: LazyLock<Client> = LazyLock::new(|| {
|
||||
ClientBuilder::new()
|
||||
.build()
|
||||
.expect("failed to build client")
|
||||
});
|
||||
|
||||
static REMOTE_URL: LazyLock<Url> = LazyLock::new(|| {
|
||||
let user_provided = env::var("DROP_SERVER_URL");
|
||||
let url = Url::parse(
|
||||
user_provided
|
||||
.as_ref()
|
||||
.map_or("http://localhost:3000", |v| v),
|
||||
)
|
||||
.expect("failed to parse URL");
|
||||
info!("using Drop server url {}", url);
|
||||
url
|
||||
});
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ContextResponseBody {
|
||||
pub manifest: DropletManifest,
|
||||
pub library_id: String,
|
||||
pub struct VersionResponseBody {
|
||||
pub manifest: Manifest,
|
||||
pub library: LibrarySource,
|
||||
pub library_path: String,
|
||||
pub version_path: String,
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
pub struct ContextQuery {
|
||||
pub struct VersionQuery {
|
||||
game: String,
|
||||
version: String,
|
||||
}
|
||||
@@ -34,110 +57,78 @@ pub struct LibrarySource {
|
||||
pub backend: LibraryBackend,
|
||||
}
|
||||
|
||||
pub struct DropContextProvider {
|
||||
client: reqwest::Client,
|
||||
base_url: Url,
|
||||
}
|
||||
impl DropContextProvider {
|
||||
pub fn new(url: Url) -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
base_url: url,
|
||||
}
|
||||
}
|
||||
}
|
||||
#[async_trait]
|
||||
impl ContextProvider for DropContextProvider {
|
||||
async fn fetch_context(
|
||||
&self,
|
||||
token: String,
|
||||
game_id: String,
|
||||
version_name: String,
|
||||
) -> Result<ContextResponseBody, ErrorOption> {
|
||||
let context_response = self
|
||||
.client
|
||||
.get(self.base_url.join("/api/v1/admin/depot/context")?)
|
||||
.query(&ContextQuery {
|
||||
game: game_id,
|
||||
version: version_name,
|
||||
})
|
||||
.header("Authorization", format!("Bearer {token}"))
|
||||
.send()
|
||||
.await?;
|
||||
pub async fn fetch_version_data(
|
||||
init_data: &AppInitData,
|
||||
game_id: String,
|
||||
version_id: String,
|
||||
) -> Result<VersionResponseBody, ErrorOption> {
|
||||
let version_data_response = CLIENT
|
||||
.get(REMOTE_URL.join("/api/v1/admin/depot/manifest")?)
|
||||
.query(&VersionQuery {
|
||||
game: game_id,
|
||||
version: version_id,
|
||||
})
|
||||
.header("Authorization", format!("Bearer {}", init_data.key))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !context_response.status().is_success() {
|
||||
if context_response.status() == StatusCode::BAD_REQUEST {
|
||||
return Err(StatusCode::NOT_FOUND.into());
|
||||
}
|
||||
|
||||
return Err(anyhow!(
|
||||
"Fetching context failed with non-success code: {}, {}",
|
||||
context_response.status(),
|
||||
context_response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or("(failed to read body)".to_owned())
|
||||
)
|
||||
.into());
|
||||
if !version_data_response.status().is_success() {
|
||||
if version_data_response.status() == StatusCode::BAD_REQUEST {
|
||||
return Err(StatusCode::NOT_FOUND.into());
|
||||
}
|
||||
|
||||
let context: ContextResponseBody = context_response.json().await?;
|
||||
|
||||
Ok(context)
|
||||
return Err(anyhow!(
|
||||
"Fetching context failed with non-success code: {}, {}",
|
||||
version_data_response.status(),
|
||||
version_data_response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or("(failed to read body)".to_owned())
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let version_data: VersionResponseBody = version_data_response.json().await?;
|
||||
|
||||
Ok(version_data)
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait ContextProvider: Send + Sync {
|
||||
/// Fetches the manifest for a specific game version.
|
||||
async fn fetch_context(
|
||||
&self,
|
||||
token: String,
|
||||
game_id: String,
|
||||
version_name: String,
|
||||
) -> Result<ContextResponseBody, ErrorOption>;
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SkeletonVersion {
|
||||
pub version_id: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait LibraryConfigurationProvider: Send + Sync {
|
||||
async fn fetch_sources(&self, token: &String) -> anyhow::Result<Vec<LibrarySource>>;
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SkeletonGame {
|
||||
pub id: String,
|
||||
pub versions: Vec<SkeletonVersion>,
|
||||
}
|
||||
pub struct DropLibraryProvider {
|
||||
client: reqwest::Client,
|
||||
base_url: Url,
|
||||
}
|
||||
impl DropLibraryProvider {
|
||||
pub fn new(url: Url) -> Self {
|
||||
Self {
|
||||
client: reqwest::Client::new(),
|
||||
base_url: url,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl LibraryConfigurationProvider for DropLibraryProvider {
|
||||
async fn fetch_sources(&self, token: &String) -> anyhow::Result<Vec<LibrarySource>> {
|
||||
let source_response = self
|
||||
.client
|
||||
.get(self.base_url.join("/api/v1/admin/library/sources")?)
|
||||
.header("Authorization", format!("Bearer {token}"))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !source_response.status().is_success() {
|
||||
return Err(anyhow!(
|
||||
"Fetching library sources failed with non-success code: {}, {}",
|
||||
source_response.status(),
|
||||
source_response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or("(failed to read body)".to_owned())
|
||||
));
|
||||
}
|
||||
|
||||
let library_sources: Vec<LibrarySource> = source_response.json().await?;
|
||||
|
||||
Ok(library_sources)
|
||||
|
||||
pub async fn fetch_instance_games(
|
||||
init_data: &AppInitData,
|
||||
) -> Result<Vec<SkeletonGame>, ErrorOption> {
|
||||
let context_response = CLIENT
|
||||
.get(REMOTE_URL.join("/api/v1/admin/depot/versions")?)
|
||||
.header("Authorization", format!("Bearer {}", init_data.key))
|
||||
.send()
|
||||
.await?;
|
||||
|
||||
if !context_response.status().is_success() {
|
||||
|
||||
return Err(anyhow!(
|
||||
"Fetching instance games failed with non-success code: {}, {}",
|
||||
context_response.status(),
|
||||
context_response
|
||||
.text()
|
||||
.await
|
||||
.unwrap_or("(failed to read body)".to_owned())
|
||||
)
|
||||
.into());
|
||||
}
|
||||
|
||||
let games: Vec<SkeletonGame> = context_response.json().await?;
|
||||
|
||||
Ok(games)
|
||||
}
|
||||
|
||||
+47
-34
@@ -1,58 +1,72 @@
|
||||
use std::sync::Arc;
|
||||
use std::{io::Error, rc::Rc, sync::Arc};
|
||||
|
||||
use aes::cipher::{KeyIvInit, StreamCipher};
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Path, State},
|
||||
http::HeaderMap,
|
||||
response::{AppendHeaders, IntoResponse},
|
||||
};
|
||||
use bytes::Bytes;
|
||||
use dashmap::{DashMap, mapref::one::RefMut};
|
||||
use droplet_rs::versions::types::{MinimumFileObject, VersionFile};
|
||||
use droplet_rs::{
|
||||
manifest::ChunkData,
|
||||
versions::types::{MinimumFileObject, VersionFile},
|
||||
};
|
||||
use futures_util::{StreamExt, stream};
|
||||
use log::{error, info};
|
||||
use reqwest::{StatusCode, header};
|
||||
use tokio::sync::SemaphorePermit;
|
||||
use tokio_util::io::ReaderStream;
|
||||
use futures_util::{StreamExt as _, stream};
|
||||
|
||||
|
||||
use crate::{
|
||||
DownloadContext, GLOBAL_CONTEXT_SEMAPHORE, download::create_download_context, state::AppState,
|
||||
};
|
||||
|
||||
type Aes128Ctr64LE = ctr::Ctr64LE<aes::Aes128>;
|
||||
|
||||
pub async fn serve_file(
|
||||
State(state): State<Arc<AppState>>,
|
||||
Path((game_id, version_name, chunk_ids)): Path<(String, String, String)>,
|
||||
Path((game_id, version_name, chunk_id)): Path<(String, String, String)>,
|
||||
) -> Result<impl IntoResponse, StatusCode> {
|
||||
let context_cache = &state.context_cache;
|
||||
|
||||
let mut context = get_or_generate_context(&state, context_cache, game_id, version_name).await?;
|
||||
let mut context = get_or_create_context(&state, context_cache, game_id, version_name).await?;
|
||||
context.reset_last_access();
|
||||
|
||||
let chunk_ids = chunk_ids.split("/").collect::<Vec<&str>>();
|
||||
let mut streams = Vec::with_capacity(chunk_ids.len());
|
||||
let mut content_lengths = Vec::with_capacity(chunk_ids.len());
|
||||
let mut total_size = 0;
|
||||
for chunk_id in chunk_ids {
|
||||
let (relative_filename, start, end) = lookup_chunk(chunk_id, &context)?;
|
||||
let reader = get_file_reader(&mut context, relative_filename, start, end).await?;
|
||||
let chunk_data = lookup_chunk(&chunk_id, &context)?;
|
||||
let mut streams = Vec::with_capacity(chunk_data.files.len());
|
||||
let mut content_length = 0;
|
||||
|
||||
for file_entry in &chunk_data.files {
|
||||
let reader = get_file_reader(
|
||||
&mut context,
|
||||
file_entry.filename.clone(),
|
||||
file_entry.start,
|
||||
file_entry.start + file_entry.length,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let stream = ReaderStream::new(reader);
|
||||
streams.push(stream);
|
||||
content_lengths.push((end - start).to_string());
|
||||
|
||||
total_size += end - start;
|
||||
content_length += file_entry.length;
|
||||
}
|
||||
|
||||
let stream = stream::iter(streams).flatten();
|
||||
let body: Body = Body::from_stream(stream);
|
||||
let mut cipher = Aes128Ctr64LE::new(&context.manifest.key.into(), &chunk_data.iv.into());
|
||||
let encrypted_stream = stream.chunks(3).map(move |raw| -> Result<Bytes, Error> {
|
||||
let data: Result<Vec<Bytes>, Error> = raw.into_iter().collect();
|
||||
let mut data = data?.concat();
|
||||
|
||||
cipher.apply_keystream(&mut data);
|
||||
|
||||
Ok(data.into())
|
||||
});
|
||||
let body: Body = Body::from_stream(encrypted_stream);
|
||||
|
||||
let mut headers = HeaderMap::new();
|
||||
headers.insert("Content-Type", "application/octet-stream".parse().unwrap());
|
||||
headers.insert("Content-Length", total_size.to_string().parse().unwrap());
|
||||
headers.insert(
|
||||
"Content-Lengths",
|
||||
content_lengths.join(",").parse().unwrap(),
|
||||
);
|
||||
headers.insert("Content-Length", content_length.into());
|
||||
|
||||
Ok((headers, body))
|
||||
}
|
||||
@@ -62,12 +76,16 @@ async fn acquire_permit<'a>() -> SemaphorePermit<'a> {
|
||||
.await
|
||||
.expect("failed to acquire semaphore");
|
||||
}
|
||||
/**
|
||||
* Needs to be cloned for reference reasons
|
||||
*/
|
||||
fn lookup_chunk(
|
||||
chunk_id: &str,
|
||||
context: &RefMut<'_, (String, String), DownloadContext>,
|
||||
) -> Result<(String, usize, usize), StatusCode> {
|
||||
) -> Result<ChunkData, StatusCode> {
|
||||
context
|
||||
.chunk_lookup_table
|
||||
.manifest
|
||||
.chunks
|
||||
.get(chunk_id)
|
||||
.cloned()
|
||||
.ok_or(StatusCode::NOT_FOUND)
|
||||
@@ -91,11 +109,11 @@ async fn get_file_reader(
|
||||
)
|
||||
.await
|
||||
.map_err(|v| {
|
||||
error!("reader error: {v:?}");
|
||||
error!("reader error for '{}': {v:?}", relative_filename);
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})
|
||||
}
|
||||
async fn get_or_generate_context<'a>(
|
||||
async fn get_or_create_context<'a>(
|
||||
state: &Arc<AppState>,
|
||||
context_cache: &'a DashMap<(String, String), DownloadContext>,
|
||||
game_id: String,
|
||||
@@ -114,14 +132,9 @@ async fn get_or_generate_context<'a>(
|
||||
Ok(already_done)
|
||||
} else {
|
||||
info!("generating context for {}...", game_id);
|
||||
let context_result = create_download_context(
|
||||
&*state.metadata_provider,
|
||||
&*state.backend_factory,
|
||||
initialisation_data,
|
||||
game_id.clone(),
|
||||
version_name.clone(),
|
||||
)
|
||||
.await?;
|
||||
let context_result =
|
||||
create_download_context(initialisation_data, game_id.clone(), version_name.clone())
|
||||
.await?;
|
||||
|
||||
state.context_cache.insert(key.clone(), context_result);
|
||||
|
||||
|
||||
+6
-26
@@ -1,38 +1,18 @@
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc};
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
|
||||
use dashmap::DashMap;
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::{
|
||||
BackendFactory, DownloadContext, LibraryConfigurationProvider, ContextProvider,
|
||||
remote::LibraryBackend,
|
||||
};
|
||||
use crate::
|
||||
DownloadContext
|
||||
;
|
||||
|
||||
pub struct AppState {
|
||||
pub token: OnceCell<AppInitData>,
|
||||
pub context_cache: DashMap<(String, String), DownloadContext>,
|
||||
|
||||
pub metadata_provider: Arc<dyn ContextProvider>,
|
||||
pub backend_factory: Arc<dyn BackendFactory>,
|
||||
pub library_provider: Arc<dyn LibraryConfigurationProvider>,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AppInitData {
|
||||
token: String,
|
||||
libraries: HashMap<String, (PathBuf, LibraryBackend)>,
|
||||
}
|
||||
impl AppInitData {
|
||||
pub fn new(token: String, libraries: HashMap<String, (PathBuf, LibraryBackend)>) -> Self {
|
||||
Self { token, libraries }
|
||||
}
|
||||
pub fn token(&self) -> String {
|
||||
self.token.clone()
|
||||
}
|
||||
pub fn set_token(&mut self, token: String) {
|
||||
self.token = token
|
||||
}
|
||||
pub fn libraries(&self) -> &HashMap<String, (PathBuf, LibraryBackend)> {
|
||||
&self.libraries
|
||||
}
|
||||
}
|
||||
pub key: String,
|
||||
}
|
||||
+8
-47
@@ -1,16 +1,15 @@
|
||||
use std::{collections::HashMap, path::PathBuf, str::FromStr, sync::Arc};
|
||||
use std::sync::Arc;
|
||||
|
||||
use axum::{Json, extract::State};
|
||||
use log::{error, info};
|
||||
use reqwest::StatusCode;
|
||||
use serde::Deserialize;
|
||||
|
||||
use crate::remote::{self, LibraryBackend, LibrarySource};
|
||||
use crate::state::{AppInitData, AppState};
|
||||
|
||||
#[derive(Deserialize)]
|
||||
pub struct TokenPayload {
|
||||
token: String,
|
||||
key: String,
|
||||
}
|
||||
|
||||
pub async fn set_token(
|
||||
@@ -21,20 +20,9 @@ pub async fn set_token(
|
||||
return Ok(StatusCode::OK);
|
||||
}
|
||||
|
||||
let token = payload.token;
|
||||
let key = payload.key;
|
||||
|
||||
let library_sources = state
|
||||
.library_provider
|
||||
.fetch_sources(&token)
|
||||
.await
|
||||
.map_err(|v| {
|
||||
error!("{v:?}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
})?;
|
||||
|
||||
let valid_library_sources = filter_library_sources(library_sources);
|
||||
|
||||
set_generated_token(&state, token, valid_library_sources)?;
|
||||
set_depot_key(&state, key)?;
|
||||
|
||||
info!("connected to drop server successfully");
|
||||
|
||||
@@ -44,48 +32,21 @@ pub async fn set_token(
|
||||
fn check_token_exists(state: &Arc<AppState>, payload: &TokenPayload) -> bool {
|
||||
if let Some(existing_data) = state.token.get() {
|
||||
assert!(
|
||||
*existing_data.token() == payload.token,
|
||||
*existing_data.key == payload.key,
|
||||
"already set up but provided with a different token"
|
||||
);
|
||||
return true;
|
||||
}
|
||||
false
|
||||
}
|
||||
fn filter_library_sources(
|
||||
library_sources: Vec<LibrarySource>,
|
||||
) -> HashMap<String, (PathBuf, LibraryBackend)> {
|
||||
library_sources
|
||||
.into_iter()
|
||||
.filter(|v| {
|
||||
matches!(
|
||||
v.backend,
|
||||
remote::LibraryBackend::Filesystem | remote::LibraryBackend::FlatFilesystem
|
||||
)
|
||||
})
|
||||
.map(|v| {
|
||||
let path = PathBuf::from_str(
|
||||
v.options
|
||||
.as_object()
|
||||
.unwrap()
|
||||
.get("baseDir")
|
||||
.unwrap()
|
||||
.as_str()
|
||||
.unwrap(),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
(v.id, (path, v.backend))
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
fn set_generated_token(
|
||||
fn set_depot_key(
|
||||
state: &Arc<AppState>,
|
||||
token: String,
|
||||
libraries: HashMap<String, (PathBuf, LibraryBackend)>,
|
||||
key: String
|
||||
) -> Result<(), StatusCode> {
|
||||
state
|
||||
.token
|
||||
.set(AppInitData::new(token, libraries))
|
||||
.set(AppInitData { key })
|
||||
.map_err(|err| {
|
||||
error!("failed to set token: {err:?}");
|
||||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
|
||||
Reference in New Issue
Block a user