feat: Migrate to Apache opendal

This commit is contained in:
quexeky
2026-01-25 21:04:03 +11:00
parent 2518d9e023
commit 8c8e9ad4c9
20 changed files with 443 additions and 772 deletions
+242 -445
View File
File diff suppressed because it is too large Load Diff
+2 -2
View File
@@ -16,12 +16,12 @@ fern = { version = "0.7.1", features = ["colored"] }
futures = "0.3.31"
indicatif = "0.18.3"
log = "0.4.29"
opendal = { version = "0.55.0", features = ["services-s3"] }
rand = "0.9.2"
reqwest = { version = "0.13.1", features = ["json"] }
rust-s3 = "0.37.1"
serde = { version = "1.0.228", features = ["derive"] }
serde_json = "1.0.148"
tokio = { version = "1.48.0", features = ["fs", "macros"] }
tokio-util = "0.7.18"
tokio-util = { version = "0.7.18", features = ["compat"] }
url = "2.5.8"
webbrowser = "1.0.6"
+2 -2
View File
@@ -1,6 +1,6 @@
use clap::{Args, Parser, Subcommand, ValueEnum};
use crate::commands::configure::config_option::ConfigOptionCli;
use crate::commands::connect::config_option::ConfigOptionCli;
#[derive(Parser)]
#[command(version, about, long_about = None)]
@@ -16,7 +16,7 @@ pub struct Cli {
#[derive(Subcommand)]
pub enum Commands {
/// Configures downpour endpoints
Configure {
Connect {
#[arg(short, long)]
name: String,
#[command(subcommand)]
@@ -1,18 +0,0 @@
use clap::Subcommand;
use serde::{Deserialize, Serialize};
use crate::commands::configure::{
s3::{S3Config, S3ConfigCli},
server::{ServerConfig, ServerConfigCli},
};
#[derive(Subcommand, Clone)]
pub enum ConfigOptionCli {
Server(ServerConfigCli),
S3(S3ConfigCli),
}
#[derive(Serialize, Deserialize, Clone)]
pub enum ConfigOption {
Server(ServerConfig),
S3(S3Config),
}
@@ -1,5 +0,0 @@
use crate::commands::configure::config_option::ConfigOption;
pub trait Configurable {
async fn configure(self) -> anyhow::Result<ConfigOption>;
}
-65
View File
@@ -1,65 +0,0 @@
use std::str::FromStr;
use clap::Args;
use s3::{Bucket, Region, creds::Credentials};
use serde::{Deserialize, Serialize};
use crate::{
commands::configure::{config_option::ConfigOption, configurable::Configurable},
interactive_optional_variable, interactive_variable,
};
#[derive(Args, Clone)]
pub struct S3ConfigCli {
secret_key: Option<String>,
key_id: Option<String>,
region: Option<String>,
bucket_name: Option<String>,
endpoint: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct S3Config {
secret_key: String,
key_id: String,
region: String,
bucket_name: String,
endpoint: Option<String>,
}
impl Configurable for S3ConfigCli {
async fn configure(self) -> anyhow::Result<ConfigOption> {
interactive_variable!(self, secret_key, "S3 Secret Key");
interactive_variable!(self, key_id, "S3 Key ID");
interactive_variable!(self, region, "S3 Region");
interactive_variable!(self, bucket_name, "S3 Bucket Name");
interactive_optional_variable!(self, endpoint, "S3 Endpoint (leave blank for none");
Ok(ConfigOption::S3(S3Config {
secret_key,
key_id,
region,
bucket_name,
endpoint,
}))
}
}
impl S3Config {
pub fn generate_bucket(&self) -> anyhow::Result<s3::Bucket> {
let credentials =
Credentials::new(Some(&self.key_id), Some(&self.secret_key), None, None, None)?;
let region = if let Some(endpoint) = &self.endpoint {
Region::Custom {
region: self.region.clone(),
endpoint: endpoint.clone(),
}
} else {
Region::from_str(&self.region)?
};
let bucket = Bucket::new(&self.bucket_name, region, credentials)?;
Ok(*bucket)
}
}
-92
View File
@@ -1,92 +0,0 @@
use clap::Args;
use serde::{Deserialize, Serialize};
use std::sync::LazyLock;
use anyhow::{Result, anyhow};
use dialoguer::{Confirm, Input, theme::ColorfulTheme};
use reqwest::Client;
use url::Url;
use crate::commands::configure::{config_option::ConfigOption, configurable::Configurable};
#[derive(Serialize, Deserialize, Clone)]
pub struct ServerConfig {
url: String,
token: String,
}
#[derive(Args, Clone)]
pub struct ServerConfigCli {
/// Endpoint of the Drop server
url: String,
#[arg(short, long)]
token: Option<String>,
}
const TOKEN_CREATE_PAYLOAD: &str =
"eyJuYW1lIjoiZG93bnBvdXIgKGNsaSkiLCJhY2xzIjpbImRlcG90Om5ldyJdfQ==";
impl Configurable for ServerConfigCli {
async fn configure(self) -> anyhow::Result<ConfigOption> {
let base_url = Url::parse(&self.url)?;
let mut token_create_url = base_url.join("/admin/settings/tokens")?;
{
let mut query = token_create_url.query_pairs_mut();
query.append_pair("payload", TOKEN_CREATE_PAYLOAD);
};
let confirm = Confirm::with_theme(&ColorfulTheme::default())
.with_prompt(format!(
"Open \"{}\" in your default browser?",
token_create_url.as_str()
))
.interact()?;
if !confirm {
return Err(anyhow!("User cancelled action"));
}
webbrowser::open(token_create_url.as_str())?;
let token: String = Input::with_theme(&ColorfulTheme::default())
.with_prompt("API token")
.interact_text()?;
validate_configuration(&self.url, &token).await?;
Ok(ConfigOption::Server(ServerConfig {
url: self.url,
token,
}))
}
}
static CLIENT: LazyLock<Client> = LazyLock::new(|| reqwest::Client::new());
const REQUIRED_ACLS: [&str; 1] = ["depot:new"];
pub async fn validate_configuration(url: &str, token: &str) -> Result<()> {
let base_url = Url::parse(&url)?;
let token_check_url = base_url.join("/api/v1/token")?;
let acl_check = CLIENT
.get(token_check_url)
.bearer_auth(token)
.send()
.await?;
if !acl_check.status().is_success() {
return Err(anyhow!(
"ACL check failed with response code: {}",
acl_check.status()
));
}
let acls: Vec<String> = acl_check.json().await?;
for acl in REQUIRED_ACLS {
if !acls.contains(&acl.to_string()) {
return Err(anyhow!("Token missing {} acl", acl));
}
}
Ok(())
}
@@ -1,29 +1,32 @@
use crate::commands::configure::{
use crate::{commands::{connect::{
config_option::{ConfigOption, ConfigOptionCli},
configurable::Configurable,
configurable::Configure,
s3::S3Config,
};
}, upload::speedtest::Speedtest}, manifest::DepotManifest};
use dialoguer::{Confirm, theme::ColorfulTheme};
use log::warn;
use futures::AsyncWriteExt;
use log::{debug, info, warn};
use opendal::Operator;
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fs};
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use std::{collections::HashMap, fs, ops::Not};
const CONFIG_DIR: &str = "downpour/config.json";
#[derive(Serialize, Deserialize)]
pub struct Config {
items: HashMap<String, ConfigOption>,
configurations: HashMap<String, ConfigOption>,
active_s3: Option<String>,
}
impl Config {
pub fn new() -> Self {
Self {
items: HashMap::new(),
configurations: HashMap::new(),
active_s3: None,
}
}
pub fn exists(&self, name: &String) -> bool {
self.items.contains_key(name)
self.configurations.contains_key(name)
}
pub fn save(&self) -> anyhow::Result<()> {
let json = serde_json::to_string(self)?;
@@ -48,13 +51,13 @@ impl Config {
if matches!(object, ConfigOption::S3(..)) {
self.active_s3 = Some(name.clone())
}
self.items.insert(name, object);
self.configurations.insert(name, object);
self.save().expect("Failed to save config");
}
pub fn get_active_s3(&self) -> Option<S3Config> {
if let Some(active_s3) = &self.active_s3 {
self.items
self.configurations
.iter()
.filter_map(|(name, option)| {
if *name == *active_s3 {
@@ -76,6 +79,9 @@ impl Config {
None
}
}
pub fn get<T: AsRef<String>>(&self, name: T) -> Option<&ConfigOption> {
self.configurations.get(name.as_ref())
}
}
pub async fn manage_configuration(
@@ -94,12 +100,37 @@ pub async fn manage_configuration(
return Err(anyhow::anyhow!("User cancelled action"));
}
}
config.add_item(
name.clone(),
match option {
ConfigOptionCli::Server(server_config) => server_config.clone().configure().await?,
ConfigOptionCli::S3(s3_config_cli) => s3_config_cli.clone().configure().await?,
},
);
let config_option = match option {
ConfigOptionCli::S3(s3_config_cli) => s3_config_cli.clone().configure().await?,
};
config.add_item(name.clone(), config_option.clone());
let operator = config_option.build()?;
generate_speedtest(&operator).await?;
generate_manifest(&operator).await?;
Ok(())
}
async fn generate_speedtest(operator: &Operator) -> anyhow::Result<()> {
if operator.exists("speedtest").await?.not() {
info!("Speedtest already exists on Depot. Skipping speedtest upload...");
return Ok(())
}
let mut writer = operator.writer("speedtest").await?.into_futures_async_write().compat_write();
let mut reader = Speedtest::new();
let written = tokio::io::copy(&mut reader, &mut writer).await?;
debug!("Wrote {} bytes to {:?}", written, operator.info());
writer.into_inner().close().await?;
Ok(())
}
async fn generate_manifest(operator: &Operator) -> anyhow::Result<()> {
info!("Manifest already exists on Depot. Skipping manifest upload...");
if operator.exists("manifest.json").await?.not() {
return Ok(())
}
let data = DepotManifest::new();
operator.write("manifest.json", serde_json::to_string(&data)?).await?;
Ok(())
}
+26
View File
@@ -0,0 +1,26 @@
use clap::Subcommand;
use opendal::{Operator, layers::LoggingLayer};
use serde::{Deserialize, Serialize};
use crate::commands::{
connect::s3::{S3Config, S3ConfigCli},
upload::uploadable::OperatorBuilder,
};
#[derive(Subcommand, Clone)]
pub enum ConfigOptionCli {
S3(S3ConfigCli),
}
#[derive(Serialize, Deserialize, Clone)]
pub enum ConfigOption {
S3(S3Config),
}
impl ConfigOption {
pub fn build(&self) -> anyhow::Result<Operator> {
Ok(match self {
ConfigOption::S3(s3_config) => s3_config.build()?,
}
.layer(LoggingLayer::default()))
}
}
+5
View File
@@ -0,0 +1,5 @@
use crate::commands::connect::config_option::ConfigOption;
pub trait Configure {
async fn configure(self) -> anyhow::Result<ConfigOption>;
}
@@ -8,7 +8,7 @@ macro_rules! interactive_variable {
let $var = if let Some($var) = $value.$var {
$var
} else {
crate::commands::configure::interactive::query_variable($prompt).unwrap()
crate::commands::connect::interactive::query_variable($prompt).unwrap()
};
};
}
@@ -18,7 +18,7 @@ macro_rules! interactive_optional_variable {
let $var = if let Some($var) = $value.$var {
Some($var)
} else {
crate::commands::configure::interactive::query_optional_variable($prompt).unwrap()
crate::commands::connect::interactive::query_optional_variable($prompt).unwrap()
};
};
}
@@ -1,7 +1,6 @@
pub mod config;
pub mod configurable;
pub mod s3;
pub mod server;
#[macro_use]
pub mod interactive;
pub mod config_option;
+66
View File
@@ -0,0 +1,66 @@
use clap::Args;
use opendal::Operator;
use serde::{Deserialize, Serialize};
use crate::{
commands::{
connect::{config_option::ConfigOption, configurable::Configure},
upload::uploadable::OperatorBuilder,
},
interactive_variable,
};
#[derive(Args, Clone)]
pub struct S3ConfigCli {
key_id: Option<String>,
secret_key: Option<String>,
endpoint: Option<String>,
region: Option<String>,
bucket_name: Option<String>,
root: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct S3Config {
key_id: String,
secret_key: String,
endpoint: String,
region: String,
bucket_name: String,
root: Option<String>,
}
impl Configure for S3ConfigCli {
async fn configure(self) -> anyhow::Result<ConfigOption> {
interactive_variable!(self, key_id, "S3 Key ID");
interactive_variable!(self, secret_key, "S3 Secret Key");
interactive_variable!(self, region, "S3 Region");
interactive_variable!(self, bucket_name, "S3 Bucket Name");
interactive_variable!(self, endpoint, "S3 Endpoint");
Ok(ConfigOption::S3(S3Config {
secret_key,
key_id,
region,
bucket_name,
endpoint,
root: self.root,
}))
}
}
impl OperatorBuilder for S3Config {
fn build(&self) -> anyhow::Result<Operator> {
let builder = opendal::services::S3::default()
.access_key_id(&self.key_id)
.secret_access_key(&self.secret_key)
.region(&self.region)
.endpoint(&self.endpoint)
.root(self.root.as_ref().map(|s| s.as_str()).unwrap_or("/"))
.bucket(&self.bucket_name)
.disable_config_load();
let op: Operator = Operator::new(builder)?.finish();
Ok(op)
}
}
+1 -1
View File
@@ -1,2 +1,2 @@
pub mod configure;
pub mod connect;
pub mod upload;
+9 -17
View File
@@ -2,11 +2,11 @@ use std::path::{Path, PathBuf};
use crate::{
cli::UploadInfo,
commands::configure::config::Config,
commands::upload::{s3::S3, uploadable::Uploadable},
commands::{connect::config::Config, upload::{chunk_reader::ChunkReader, uploadable::OperatorBuilder}},
manifest::generate_manifest,
};
use log::info;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
pub async fn upload(info: &UploadInfo, config: Config) -> anyhow::Result<()> {
let game_id = &info.game_id;
@@ -14,27 +14,19 @@ pub async fn upload(info: &UploadInfo, config: Config) -> anyhow::Result<()> {
let version_id = &info.version_id;
let manifest = generate_manifest(&Path::new(path)).await?;
let mut uploader: Box<dyn Uploadable> = match info.upload_style {
crate::cli::UploadStyle::S3 => Box::new(S3::new(
&config
let operator = match info.upload_style {
crate::cli::UploadStyle::S3 => config
.get_active_s3()
.ok_or(anyhow::Error::msg("Could not get active S3 value"))?,
)?),
.ok_or(anyhow::Error::msg("Could not get active S3 value"))?.build()?,
};
info!("Uploading chunks");
for (id, data) in &manifest.chunks {
info!("Uploading chunk id {id}");
uploader.upload_chunk(PathBuf::from(path), game_id, version_id, id, data).await?;
let mut reader = ChunkReader::new(&path, data);
let mut writer = operator.writer(&format!("{game_id}/{version_id}/{id}")).await?.into_futures_async_write().compat_write();
tokio::io::copy(&mut reader, &mut writer);
}
info!("Finished uploading chunks");
info!("Uploading manifest");
uploader
.upload_manifest(manifest, game_id, version_id)
.await?;
info!("Uploading speedtest");
uploader.upload_speedtest().await?;
Ok(())
}
-1
View File
@@ -1,5 +1,4 @@
pub mod interface;
pub mod s3;
pub mod speedtest;
pub mod uploadable;
pub mod chunk_reader;
-84
View File
@@ -1,84 +0,0 @@
use crate::commands::{
configure::s3::S3Config,
upload::{
chunk_reader::ChunkReader,
speedtest::{SPEEDTEST_PATH, Speedtest},
uploadable::Uploadable,
},
};
use async_trait::async_trait;
use droplet_rs::{
manifest::{ChunkData, Manifest},
};
use s3::Bucket;
use serde_json::json;
use std::{ops::Deref, path::PathBuf};
pub struct S3 {
bucket: s3::Bucket,
}
impl S3 {
pub fn new(config: &S3Config) -> anyhow::Result<Self> {
Ok(Self {
bucket: config.generate_bucket()?,
})
}
}
#[async_trait]
impl Uploadable for S3 {
async fn upload_chunk(
&mut self,
base_path: PathBuf,
id: &String,
version: &String,
chunk_id: &String,
chunk: &ChunkData,
) -> anyhow::Result<()> {
let path = &PathBuf::from(id)
.join(version)
.join(chunk_id)
.to_string_lossy()
.to_string();
let mut reader = ChunkReader::new(&base_path, chunk);
self.put_object_stream(&mut reader, &path).await?;
Ok(())
}
async fn upload_speedtest(&mut self) -> anyhow::Result<()> {
if self.object_exists(SPEEDTEST_PATH).await? {
return Ok(());
}
println!("Uploading speedtest");
let mut speedtest = Speedtest::new();
self.put_object_stream(&mut speedtest, SPEEDTEST_PATH)
.await?;
Ok(())
}
async fn upload_manifest(
&mut self,
manifest: Manifest,
game_id: &String,
version_id: &String,
) -> anyhow::Result<()> {
self.put_object_builder(
PathBuf::from(game_id)
.join(version_id)
.join("manifest.json")
.to_string_lossy()
.to_string(),
json!(manifest).to_string().as_bytes(),
)
.with_content_type("application/json")
.execute()
.await?;
Ok(())
}
}
impl Deref for S3 {
type Target = Bucket;
fn deref(&self) -> &Self::Target {
&self.bucket
}
}
+3 -16
View File
@@ -2,22 +2,9 @@ use std::path::PathBuf;
use async_trait::async_trait;
use droplet_rs::manifest::{ChunkData, Manifest};
use opendal::Operator;
#[async_trait]
pub trait Uploadable {
async fn upload_chunk(
&mut self,
base_path: PathBuf,
id: &String,
version: &String,
chunk_id: &String,
chunk: &ChunkData,
) -> anyhow::Result<()>;
async fn upload_speedtest(&mut self) -> anyhow::Result<()>;
async fn upload_manifest(
&mut self,
manifest: Manifest,
game_id: &String,
version_id: &String,
) -> anyhow::Result<()>;
pub trait OperatorBuilder {
fn build(&self) -> anyhow::Result<Operator>;
}
+3 -3
View File
@@ -1,7 +1,7 @@
use crate::commands::configure::config::manage_configuration;
use crate::commands::connect::config::manage_configuration;
use crate::{
cli::{Cli, Commands},
commands::configure::config::Config,
commands::connect::config::Config,
commands::upload,
};
use clap::Parser;
@@ -18,7 +18,7 @@ async fn main() -> anyhow::Result<()> {
let mut config = Config::read();
match &cli.command {
Commands::Configure { name, option } => {
Commands::Connect { name, option } => {
manage_configuration(&mut config, name, option).await?
}
Commands::Upload(info) => {
+34 -1
View File
@@ -1,7 +1,40 @@
use std::path::Path;
use std::{collections::HashMap, path::Path};
use droplet_rs::manifest::{Manifest, generate_manifest_rusty};
use indicatif::{ProgressBar, ProgressStyle};
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize)]
pub struct DepotManifest {
content: HashMap<String, DepotManifestGameData>,
}
#[derive(Serialize, Deserialize)]
struct DepotManifestGameData {
version_id: String,
compression: CompressionOption,
}
#[derive(Serialize, Deserialize)]
pub enum CompressionOption {
None,
Gzip,
Zstd,
}
impl DepotManifest {
pub fn new() -> Self {
Self {
content: HashMap::new(),
}
}
pub fn add(&mut self, game_id: String, version_id: String, compression: CompressionOption) {
self.content.insert(
game_id,
DepotManifestGameData {
version_id,
compression,
},
);
}
}
pub async fn generate_manifest(dir: &Path) -> anyhow::Result<Manifest> {
let progress_bar = ProgressBar::new(100_00).with_style(