In-app store, torrential backend, locales (#332)

* feat: add store nav and fixes

* fix: reduce password requirement & new task error ui

* fix: client webtoken fix

* fix: delta versions and dockerfile

* fix: use setup platforms for filter & display

* fix: setup not accounted when returning valid options

* feat: tighter delta version support

* feat: dl/disk size

* feat: offload manifest generation to torrential

* fix: bump torrential

* feat: remove droplet

* feat: bump torrential

* feat: convert locales
This commit is contained in:
DecDuck
2026-02-06 00:12:24 +11:00
committed by GitHub
parent 837bc6eb1d
commit d234f8df33
82 changed files with 1737 additions and 967 deletions
+5 -1
View File
@@ -38,7 +38,7 @@ export class Service<T> {
private setup: Setup | undefined;
private healthcheck: Healthcheck | undefined;
private logger: Logger<never>;
logger: Logger<never>;
private currentProcess: ChildProcess | undefined;
@@ -90,6 +90,7 @@ export class Service<T> {
if (!process.env[disableEnv]) {
const serviceProcess = this.executor();
this.logger.info("service launched");
serviceProcess.on("close", async (code, signal) => {
serviceProcess.kill();
this.currentProcess = undefined;
@@ -99,12 +100,15 @@ export class Service<T> {
await new Promise((r) => setTimeout(r, 5000));
if (this.spun) this.launch();
});
serviceProcess.stdout?.on("data", (data) =>
this.logger.info(data.toString().trim()),
);
serviceProcess.stderr?.on("data", (data) =>
this.logger.error(data.toString().trim()),
);
this.currentProcess = serviceProcess;
}
@@ -1,78 +0,0 @@
import { spawn } from "child_process";
import { Service } from "..";
import fs from "fs";
import prisma from "../../db/database";
import { logger } from "../../logging";
import { systemConfig } from "../../config/sys-conf";
const INTERNAL_DEPOT_URL = new URL(
process.env.INTERNAL_DEPOT_URL ?? "http://localhost:5000",
);
export const TORRENTIAL_SERVICE = new Service(
"torrential",
() => {
const localDir = fs.readdirSync(".");
if ("torrential" in localDir) {
const stat = fs.statSync("./torrential");
if (stat.isDirectory()) {
// in dev and we have the submodule
logger.info(
"torrential detected in development mode - building from source",
);
return spawn(
"cargo run --manifest-path ./torrential/Cargo.toml",
[],
{},
);
} else {
// binary
return spawn("./torrential", [], {});
}
}
const envPath = process.env.TORRENTIAL_PATH;
if (envPath) return spawn(envPath, [], {});
return spawn("torrential", [], {});
},
async () => {
const externalUrl = systemConfig.getExternalUrl();
const depot = await prisma.depot.upsert({
where: {
id: "torrential",
},
update: {
endpoint: `${externalUrl}/api/v1/depot`,
},
create: {
id: "torrential",
endpoint: `${externalUrl}/api/v1/depot`,
},
});
await $fetch(`${INTERNAL_DEPOT_URL.toString()}key`, {
method: "POST",
body: { key: depot.key },
});
return true;
},
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
async () => await $fetch(`${INTERNAL_DEPOT_URL.toString()}healthcheck`),
{
async invalidate(gameId: string, versionId: string) {
try {
await $fetch(`${INTERNAL_DEPOT_URL.toString()}invalidate`, {
method: "POST",
body: {
game: gameId,
version: versionId,
},
});
} catch (e) {
logger.warn("invalidate torrential cache failed with error: " + e);
}
},
},
);
@@ -0,0 +1,27 @@
# torrential service
The role of torrential has expanded recently to be the source of ALL Rust/native execution within Drop, to avoid using the buggy napi.rs `droplet` package.
It communicates over `127.0.0.1:33148`, which the service connects to and stores the socket handle to.
## message format
Each message is prefixed with an 8 byte little-endian unsigned integer that dictates the length of the message. Then, they are wrapped in the respective DropBound or TorrentialBound wrappers, which contain the type and data fields, which dictate which sub-message they are deserialized into.
## query processors
**Note: "Query" is the old name for a DropBound message**
The service allows you to configure a series of query processors that match based on type and recieve the raw message to deserialize themselves. They can optionally return a response message, which automatically gets returned and wrapped.
## message ids
All messages in the pipe have a message ID which dictates which "request" they're for. Queries and responses (DropBound and TorrentialBound) carry the same message ID if they are related.
## old `/api/v1/admin/depot/torrential/*` routes
They've been turned into query and response messages as described above.
# torrential service internals
We use a read buffer to queue up enough bytes that we can deserialize the entire message at once. When a chunk comes in, we append it to the current readbuf, and then check if we have enough bytes to assemble the length header and it's associated packet. If we do, we deserialize, cut off the bytes, and fire off all the necessary handlers for that packet.
@@ -0,0 +1,361 @@
import type { Message } from "@bufbuild/protobuf";
import { create, fromBinary } from "@bufbuild/protobuf";
import {
ClientCertQuerySchema,
ClientCertResponseSchema,
GenerateManifestSchema,
HasBackendQuerySchema,
HasBackendResponseSchema,
ListFilesQuerySchema,
ListFilesResponseSchema,
ManifestCompleteSchema,
ManifestLogSchema,
ManifestProgressSchema,
PeekFileQuerySchema,
PeekFileResponseSchema,
RootCertQuerySchema,
RootCertResponseSchema,
RpcErrorSchema,
} from "../../proto/torrential/proto/droplet_pb";
import type { QueryProcessor } from ".";
import TORRENTIAL_SERVICE from ".";
import type { DropBound } from "../../proto/torrential/proto/core_pb";
import {
DropBoundType,
TorrentialBoundType,
} from "../../proto/torrential/proto/core_pb";
import { logger } from "../../logging";
import type { CertificateBundle } from "../../clients/ca";
import type { GenMessage } from "@bufbuild/protobuf/codegenv2";
interface BaseCallbacks<T> {
resolve: (value: T) => void;
reject: (err: string) => void;
}
type ManifestGenerationCallbacks = BaseCallbacks<string> & {
progress: (v: number) => void;
log: (v: string) => void;
type: "manifest";
};
type CaGenerationCallback = BaseCallbacks<CertificateBundle> & {
type: "certificate";
};
type HasBackendCallback = BaseCallbacks<boolean> & {
type: "has_backend";
};
type ListFilesCallback = BaseCallbacks<string[]> & {
type: "list_files";
};
type PeekFileCallback = BaseCallbacks<number> & {
type: "peek_file";
};
type DropletFunctionCallbacks =
| ManifestGenerationCallbacks
| CaGenerationCallback
| HasBackendCallback
| ListFilesCallback
| PeekFileCallback;
class DropletInterfaceManager {
private callbacks: Map<string, DropletFunctionCallbacks> = new Map();
private queryProcessors: QueryProcessor<
DropBoundType,
TorrentialBoundType,
Message
>[];
constructor() {
// This handler is special, it's a global error handler
const errorProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.RPC_ERROR,
run: async (message, callbacks) => {
const messageData = fromBinary(RpcErrorSchema, message.data);
callbacks.reject(messageData.error);
this.callbacks.delete(message.messageId);
},
});
// Other than the error handler, each "_COMPLETE" handler is responsible
// for resolving the promise, and cleaning themselves up (removing from map)
const manifestCompleteProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.MANIFEST_COMPLETE,
callbackType: "manifest",
run: async (message, callbacks) => {
const messageData = fromBinary(ManifestCompleteSchema, message.data);
callbacks.resolve(messageData.manifest);
this.callbacks.delete(message.messageId);
},
});
const manifestLogProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.MANIFEST_LOG,
callbackType: "manifest",
run: async (message, callbacks) => {
const messageData = fromBinary(ManifestLogSchema, message.data);
callbacks.log(messageData.logLine);
},
});
const manifestProgressProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.MANIFEST_PROGRESS,
callbackType: "manifest",
run: async (message, callbacks) => {
const messageData = fromBinary(ManifestProgressSchema, message.data);
callbacks.progress(messageData.progress);
},
});
const rootCaProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.ROOT_CA_COMPLETE,
callbackType: "certificate",
run: async (message, callbacks) => {
const messageData = fromBinary(RootCertResponseSchema, message.data);
callbacks.resolve({
priv: messageData.priv,
cert: messageData.cert,
} satisfies CertificateBundle);
this.callbacks.delete(message.messageId);
},
});
const clientCertProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.CLIENT_CERT_COMPLETE,
callbackType: "certificate",
run: async (message, callbacks) => {
const messageData = fromBinary(ClientCertResponseSchema, message.data);
callbacks.resolve({
cert: messageData.cert,
priv: messageData.priv,
});
this.callbacks.delete(message.messageId);
},
});
const hasBackendProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.HAS_BACKEND_COMPLETE,
callbackType: "has_backend",
run: async (message, callbacks) => {
const messageData = fromBinary(HasBackendResponseSchema, message.data);
callbacks.resolve(messageData.result);
this.callbacks.delete(message.messageId);
},
});
const listFilesProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.LIST_FILES_COMPLETE,
callbackType: "list_files",
run: async (message, callbacks) => {
const messageData = fromBinary(ListFilesResponseSchema, message.data);
callbacks.resolve(messageData.files);
this.callbacks.delete(message.messageId);
},
});
const peekFileProcessor = this.defineDropletCallbackProcessor({
queryType: DropBoundType.PEEK_FILE_COMPLETE,
callbackType: "peek_file",
run: async (message, callbacks) => {
const messageData = fromBinary(PeekFileResponseSchema, message.data);
callbacks.resolve(Number(messageData.size));
this.callbacks.delete(message.messageId);
},
});
// All query processors go into the array to get added
this.queryProcessors = [
errorProcessor,
manifestCompleteProcessor,
manifestLogProcessor,
manifestProgressProcessor,
rootCaProcessor,
clientCertProcessor,
hasBackendProcessor,
listFilesProcessor,
peekFileProcessor,
];
for (const processor of this.queryProcessors) {
TORRENTIAL_SERVICE.registerProcessor(processor);
}
}
/**
* Defines a handler to consume an incoming message
* from torrential
*
* Passes in the query type (DropBoundType) and callback type,
* to make sure we respond to right callback,
* and give us proper typing when it comes to the callbacks (resolve, specifically)
*
* Returns a query processor that can be registered with the service
*/
private defineDropletCallbackProcessor<
T extends DropBoundType,
K extends TorrentialBoundType,
V extends Message,
C extends DropletFunctionCallbacks,
CT extends C["type"],
>(opts: {
queryType: T;
callbackType?: CT;
run: (
query: DropBound,
callbacks: Extract<C, { type: CT }>,
) => Promise<void>;
}) {
return {
queryType: opts.queryType,
run: async (message) => {
const callbacks = this.callbacks.get(message.messageId);
if (!callbacks) {
logger.warn(
`got a droplet message with old message id: ${message.type}, ${message.messageId}`,
);
return undefined;
}
if (opts.callbackType && callbacks.type !== opts.callbackType)
return undefined;
// eslint-disable-next-line @typescript-eslint/no-explicit-any
await opts.run(message, callbacks as any);
return undefined;
},
} satisfies QueryProcessor<T, K, V>;
}
getProcessors() {
return this.queryProcessors;
}
/**
* Sets up message ID,
* sends request to torrential,
* and sets up callbacks
*/
private async createDropletFunction<
M extends Message,
K extends DropletFunctionCallbacks,
KT extends K["type"],
>(
message: M,
schema: GenMessage<M>,
messageType: TorrentialBoundType,
callbackType: KT,
): Promise<Parameters<Extract<K, { type: KT }>["resolve"]>[0]> {
const messageId = crypto.randomUUID();
await TORRENTIAL_SERVICE.writeMessage(messageId, {
type: messageType,
schema: schema,
data: message,
});
return await new Promise((resolve, reject) => {
this.callbacks.set(messageId, {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
type: callbackType as any,
resolve,
reject,
});
});
}
async generateDropletManifest(
versionDir: string,
progress: (v: number) => void,
log: (v: string) => void,
) {
const messageId = crypto.randomUUID();
const manifestGenerationRequest = create(GenerateManifestSchema, {
versionDir,
});
await TORRENTIAL_SERVICE.writeMessage(messageId, {
type: TorrentialBoundType.GENERATE_MANIFEST,
schema: GenerateManifestSchema,
data: manifestGenerationRequest,
});
return await new Promise<string>((resolve, reject) => {
this.callbacks.set(messageId, {
resolve,
reject,
progress,
log,
type: "manifest",
});
});
}
async generateRootCa() {
return await this.createDropletFunction(
create(RootCertQuerySchema, {}),
RootCertQuerySchema,
TorrentialBoundType.GENERATE_ROOT_CA,
"certificate",
);
}
async generateClientCert(
clientId: string,
clientName: string,
rootCa: CertificateBundle,
) {
return await this.createDropletFunction(
create(ClientCertQuerySchema, {
clientId,
clientName,
rootPriv: rootCa.priv,
rootCert: rootCa.cert,
}),
ClientCertQuerySchema,
TorrentialBoundType.GENERATE_CLIENT_CERT,
"certificate",
);
}
async hasBackend(path: string) {
return await this.createDropletFunction(
create(HasBackendQuerySchema, {
path,
}),
HasBackendQuerySchema,
TorrentialBoundType.HAS_BACKEND_QUERY,
"has_backend",
);
}
async listFiles(path: string) {
return await this.createDropletFunction(
create(ListFilesQuerySchema, {
path,
}),
ListFilesQuerySchema,
TorrentialBoundType.LIST_FILES_QUERY,
"list_files",
);
}
async peekFile(path: string, subpath: string) {
return await this.createDropletFunction(
create(PeekFileQuerySchema, {
path: path,
filename: subpath,
}),
PeekFileQuerySchema,
TorrentialBoundType.PEEK_FILE_QUERY,
"peek_file",
);
}
}
export const dropletInterface = new DropletInterfaceManager();
export default dropletInterface;
@@ -0,0 +1,189 @@
import { spawn } from "child_process";
import { Service } from "..";
import fs from "fs";
import { logger } from "../../logging";
import type { Socket } from "net";
import net from "net";
import { create, toBinary, type Message } from "@bufbuild/protobuf";
import { fromBinary } from "@bufbuild/protobuf";
import { StringValueSchema } from "@bufbuild/protobuf/wkt";
import type { GenMessage } from "@bufbuild/protobuf/codegenv2";
import {
DropBoundSchema,
TorrentialBoundSchema,
TorrentialBoundType,
type DropBound,
type DropBoundType,
} from "../../proto/torrential/proto/core_pb";
/// Processors
import manifestFetchProcessor from "./manifest-fetch";
import serverGamesProcessor from "./server-games";
const INTERNAL_DEPOT_URL = new URL(
process.env.INTERNAL_DEPOT_URL ?? "http://localhost:5000",
);
export interface QueryProcessor<
T extends DropBoundType,
K extends TorrentialBoundType,
V extends Message,
> {
queryType: T;
run: (
query: DropBound,
) => Promise<{ type: K; schema: GenMessage<V>; data: V } | undefined>;
}
export class TorrentialService extends Service<unknown> {
private socket: Socket | undefined;
private readbuf: Buffer<ArrayBufferLike> = Buffer.alloc(0);
private readingQueue = false;
private queryProcessors: Map<
DropBoundType,
QueryProcessor<DropBoundType, TorrentialBoundType, Message>
> = new Map();
constructor() {
super(
"torrential",
() => {
const localDir = fs.readdirSync(".");
if ("torrential" in localDir) {
const stat = fs.statSync("./torrential");
if (stat.isDirectory()) {
// in dev and we have the submodule
logger.info(
"torrential detected in development mode - building from source",
);
return spawn(
"cargo run --manifest-path ./torrential/Cargo.toml",
[],
{},
);
} else {
// binary
return spawn("./torrential", [], {});
}
}
const envPath = process.env.TORRENTIAL_PATH;
if (envPath) return spawn(envPath, [], {});
return spawn("torrential", [], {});
},
async () => {
if (this.socket) return true;
this.socket = net.createConnection({ port: 33148, host: "127.0.0.1" });
await new Promise<void>((r) =>
this.socket!.on("connect", () => {
this.logger.info("connected to torrential socket");
r();
}),
);
this.setupRead();
return true;
},
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
async () => await $fetch(`${INTERNAL_DEPOT_URL.toString()}healthcheck`),
{},
);
this.queryProcessors.set(
manifestFetchProcessor.queryType,
manifestFetchProcessor,
);
this.queryProcessors.set(
serverGamesProcessor.queryType,
serverGamesProcessor,
);
}
registerProcessor(
processor: QueryProcessor<DropBoundType, TorrentialBoundType, Message>,
) {
this.queryProcessors.set(processor.queryType, processor);
}
private setupRead() {
if (!this.socket) return;
this.socket.on("data", (data) => {
this.readbuf = Buffer.concat([this.readbuf, data]);
if (!this.readingQueue) {
this.readingQueue = true;
this.queueRead().finally(() => {
this.readingQueue = false;
});
}
});
}
async writeMessage<T extends Message>(
messageId: string,
value: {
type: TorrentialBoundType;
schema: GenMessage<T>;
data: T;
},
) {
const response = create(TorrentialBoundSchema, {
messageId: messageId,
type: value.type,
data: toBinary(value.schema, value.data),
});
const responseBinary = toBinary(TorrentialBoundSchema, response);
const responseLength = responseBinary.length;
const responseLengthBuf = Buffer.allocUnsafe(8);
responseLengthBuf.writeBigUInt64LE(BigInt(responseLength), 0);
this.socket!.write(responseLengthBuf);
this.socket!.write(responseBinary);
}
private async queueRead() {
if (this.readbuf.length < 8) return;
const sizeBytes = this.readbuf.subarray(0, 8);
const size = sizeBytes.readBigUInt64LE(0);
const end = Number(size + BigInt(8));
if (this.readbuf.length < end) return;
const buffer = this.readbuf.subarray(8, end);
this.readbuf = this.readbuf.subarray(end);
const query = fromBinary(DropBoundSchema, buffer);
const processor = this.queryProcessors.get(query.type);
if (!processor) {
this.logger.warn(`no processor for query type: ${query.type}`);
return;
}
let value;
try {
value = await processor.run(query);
} catch (e) {
this.logger.warn(
`process query for ${query.type} failed with error: ${e}`,
);
value = {
type: TorrentialBoundType.ERROR,
schema: StringValueSchema,
data: create(StringValueSchema, {
value: (e as string).toString(),
}),
};
}
if (value) await this.writeMessage(query.messageId, value);
// Call until we can't
await this.queueRead();
}
}
export const TORRENTIAL_SERVICE = new TorrentialService();
export default TORRENTIAL_SERVICE;
@@ -0,0 +1,89 @@
import {
VersionQuerySchema,
VersionResponse_LibrarySource_LibraryBackend,
VersionResponse_LibrarySourceSchema,
VersionResponse_Manifest_ChunkData_FileEntrySchema,
VersionResponse_Manifest_ChunkDataSchema,
VersionResponse_ManifestSchema,
VersionResponseSchema,
} from "../../proto/torrential/proto/version_pb";
import { castManifest } from "../../library/manifest/utils";
import { LibraryBackend } from "~/prisma/client/client";
import { create, fromBinary } from "@bufbuild/protobuf";
import prisma from "../../db/database";
import { defineQueryProcessor } from "./utils";
import {
DropBoundType,
TorrentialBoundType,
} from "../../proto/torrential/proto/core_pb";
export default defineQueryProcessor({
queryType: DropBoundType.VERSION_QUERY,
run: async (query) => {
const queryData = fromBinary(VersionQuerySchema, query.data);
const version = await prisma.gameVersion.findUnique({
where: {
versionId: queryData.versionId,
},
select: {
dropletManifest: true,
versionPath: true,
game: {
select: {
library: true,
libraryPath: true,
},
},
},
});
if (!version) throw "Game version not found";
const manifest = castManifest(version.dropletManifest);
const mapEnum = (v: LibraryBackend) => {
switch (v) {
case LibraryBackend.Filesystem:
return VersionResponse_LibrarySource_LibraryBackend.FILESYSTEM;
case LibraryBackend.FlatFilesystem:
return VersionResponse_LibrarySource_LibraryBackend.FLAT_FILESYSTEM;
}
};
return {
type: TorrentialBoundType.VERSION_RESPONSE,
schema: VersionResponseSchema,
data: create(VersionResponseSchema, {
manifest: create(VersionResponse_ManifestSchema, {
version: manifest.version,
size: BigInt(manifest.size),
key: Buffer.from(manifest.key),
chunks: Object.fromEntries(
Object.entries(manifest.chunks).map(([id, chunk]) => [
id,
create(VersionResponse_Manifest_ChunkDataSchema, {
checksum: chunk.checksum,
iv: Buffer.from(chunk.iv),
files: chunk.files.map((file) =>
create(VersionResponse_Manifest_ChunkData_FileEntrySchema, {
filename: file.filename,
start: BigInt(file.start),
length: BigInt(file.length),
permissions: file.permissions,
}),
),
}),
]),
),
}),
source: create(VersionResponse_LibrarySourceSchema, {
options: JSON.stringify(version.game.library.options),
id: version.game.library.id,
backend: mapEnum(version.game.library.backend),
}),
libraryPath: version.game.libraryPath,
versionPath: version.versionPath!,
}),
};
},
});
@@ -0,0 +1,38 @@
import prisma from "../../db/database";
import { ServerGamesResponseSchema } from "../../proto/torrential/proto/manifest_pb";
import { create } from "@bufbuild/protobuf";
import { defineQueryProcessor } from "./utils";
import {
DropBoundType,
TorrentialBoundType,
} from "../../proto/torrential/proto/core_pb";
export default defineQueryProcessor({
queryType: DropBoundType.SERVER_GAMES_QUERY,
run: async () => {
// const queryData = fromBinary(ServerGamesQuerySchema, query.data);
const games = await prisma.game.findMany({
select: {
id: true,
versions: {
select: {
versionId: true,
},
where: {
versionPath: {
not: null,
},
},
},
},
});
return {
type: TorrentialBoundType.SERVER_GAMES_RESPONSE,
schema: ServerGamesResponseSchema,
data: create(ServerGamesResponseSchema, {
games,
}),
};
},
});
@@ -0,0 +1,15 @@
import type { Message } from "@bufbuild/protobuf";
import type { QueryProcessor } from ".";
import type {
DropBoundType,
TorrentialBoundType,
} from "../../proto/torrential/proto/core_pb";
export function defineQueryProcessor<
T extends DropBoundType,
K extends TorrentialBoundType,
V extends Message,
>(opts: QueryProcessor<T, K, V>) {
// TORRENTIAL_SERVICE.queryProcessors.set(opts.queryType, opts as any);
return opts;
}