feat(notifications): added notification system w/ interwoven refactoring
This commit is contained in:
@@ -0,0 +1,28 @@
|
||||
import prisma from "~/server/internal/db/database";
|
||||
|
||||
export default defineEventHandler(async (h3) => {
|
||||
const userId = await h3.context.session.getUserId(h3);
|
||||
if (!userId) throw createError({ statusCode: 403 });
|
||||
|
||||
const notificationId = getRouterParam(h3, "id");
|
||||
if (!notificationId)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Missing notification ID",
|
||||
});
|
||||
|
||||
const notification = await prisma.notification.delete({
|
||||
where: {
|
||||
id: notificationId,
|
||||
userId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!notification)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Invalid notification ID",
|
||||
});
|
||||
|
||||
return {};
|
||||
});
|
||||
@@ -0,0 +1,28 @@
|
||||
import prisma from "~/server/internal/db/database";
|
||||
|
||||
export default defineEventHandler(async (h3) => {
|
||||
const userId = await h3.context.session.getUserId(h3);
|
||||
if (!userId) throw createError({ statusCode: 403 });
|
||||
|
||||
const notificationId = getRouterParam(h3, "id");
|
||||
if (!notificationId)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Missing notification ID",
|
||||
});
|
||||
|
||||
const notification = await prisma.notification.findFirst({
|
||||
where: {
|
||||
id: notificationId,
|
||||
userId,
|
||||
},
|
||||
});
|
||||
|
||||
if (!notification)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Invalid notification ID",
|
||||
});
|
||||
|
||||
return notification;
|
||||
});
|
||||
@@ -0,0 +1,31 @@
|
||||
import prisma from "~/server/internal/db/database";
|
||||
|
||||
export default defineEventHandler(async (h3) => {
|
||||
const userId = await h3.context.session.getUserId(h3);
|
||||
if (!userId) throw createError({ statusCode: 403 });
|
||||
|
||||
const notificationId = getRouterParam(h3, "id");
|
||||
if (!notificationId)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Missing notification ID",
|
||||
});
|
||||
|
||||
const notification = await prisma.notification.update({
|
||||
where: {
|
||||
id: notificationId,
|
||||
userId,
|
||||
},
|
||||
data: {
|
||||
read: true,
|
||||
},
|
||||
});
|
||||
|
||||
if (!notification)
|
||||
throw createError({
|
||||
statusCode: 400,
|
||||
statusMessage: "Invalid notification ID",
|
||||
});
|
||||
|
||||
return notification;
|
||||
});
|
||||
@@ -0,0 +1,17 @@
|
||||
import prisma from "~/server/internal/db/database";
|
||||
|
||||
export default defineEventHandler(async (h3) => {
|
||||
const userId = await h3.context.session.getUserId(h3);
|
||||
if (!userId) throw createError({ statusCode: 403 });
|
||||
|
||||
const notifications = await prisma.notification.findMany({
|
||||
where: {
|
||||
userId,
|
||||
},
|
||||
orderBy: {
|
||||
created: "desc", // Newest first
|
||||
},
|
||||
});
|
||||
|
||||
return notifications;
|
||||
});
|
||||
@@ -0,0 +1,17 @@
|
||||
import prisma from "~/server/internal/db/database";
|
||||
|
||||
export default defineEventHandler(async (h3) => {
|
||||
const userId = await h3.context.session.getUserId(h3);
|
||||
if (!userId) throw createError({ statusCode: 403 });
|
||||
|
||||
await prisma.notification.updateMany({
|
||||
where: {
|
||||
userId,
|
||||
},
|
||||
data: {
|
||||
read: true,
|
||||
},
|
||||
});
|
||||
|
||||
return;
|
||||
});
|
||||
@@ -0,0 +1,42 @@
|
||||
import notificationSystem from "~/server/internal/notifications";
|
||||
import session from "~/server/internal/session";
|
||||
import { parse as parseCookies } from "cookie-es";
|
||||
|
||||
// TODO add web socket sessions for horizontal scaling
|
||||
// Peer ID to user ID
|
||||
const socketSessions: { [key: string]: string } = {};
|
||||
|
||||
export default defineWebSocketHandler({
|
||||
async open(peer) {
|
||||
const cookies = peer.request?.headers?.get("Cookie");
|
||||
if (!cookies) {
|
||||
peer.send("unauthenticated");
|
||||
return;
|
||||
}
|
||||
|
||||
const parsedCookies = parseCookies(cookies);
|
||||
const token = parsedCookies[session.getDropTokenCookie()];
|
||||
|
||||
const userId = await session.getUserIdRaw(token);
|
||||
if (!userId) {
|
||||
peer.send("unauthenticated");
|
||||
return;
|
||||
}
|
||||
|
||||
socketSessions[peer.id] = userId;
|
||||
|
||||
notificationSystem.listen(userId, peer.id, (notification) => {
|
||||
peer.send(JSON.stringify(notification));
|
||||
});
|
||||
},
|
||||
async close(peer, details) {
|
||||
const userId = socketSessions[peer.id];
|
||||
if (!userId) {
|
||||
console.log(`skipping websocket close for ${peer.id}`);
|
||||
return;
|
||||
}
|
||||
|
||||
notificationSystem.unlisten(userId, peer.id);
|
||||
delete socketSessions[peer.id];
|
||||
},
|
||||
});
|
||||
@@ -1,6 +1,4 @@
|
||||
import { H3Event } from "h3";
|
||||
import session from "~/server/internal/session";
|
||||
import { v4 as uuidv4 } from "uuid";
|
||||
import taskHandler, { TaskMessage } from "~/server/internal/tasks";
|
||||
import { parse as parseCookies } from "cookie-es";
|
||||
|
||||
@@ -24,7 +22,7 @@ export default defineWebSocketHandler({
|
||||
peer.send("unauthenticated");
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
const admin = session.getAdminUser(token);
|
||||
adminSocketSessions[peer.id] = admin !== undefined;
|
||||
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
/*
|
||||
The notification system handles the recieving, creation and sending of notifications in Drop
|
||||
|
||||
Design goals:
|
||||
1. Nonce-based notifications; notifications should only be created once
|
||||
2. Real-time; use websocket listeners to keep clients up-to-date
|
||||
*/
|
||||
|
||||
import { Notification } from "@prisma/client";
|
||||
import prisma from "../db/database";
|
||||
|
||||
export type NotificationCreateArgs = Pick<
|
||||
Notification,
|
||||
"title" | "description" | "actions" | "nonce"
|
||||
>;
|
||||
|
||||
class NotificationSystem {
|
||||
private listeners: {
|
||||
[key: string]: Map<string, (notification: Notification) => any>;
|
||||
} = {};
|
||||
|
||||
listen(
|
||||
userId: string,
|
||||
id: string,
|
||||
callback: (notification: Notification) => any
|
||||
) {
|
||||
this.listeners[userId] ??= new Map();
|
||||
this.listeners[userId].set(id, callback);
|
||||
|
||||
this.catchupListener(userId, id);
|
||||
}
|
||||
|
||||
unlisten(userId: string, id: string) {
|
||||
this.listeners[userId].delete(id);
|
||||
}
|
||||
|
||||
private async catchupListener(userId: string, id: string) {
|
||||
const callback = this.listeners[userId].get(id);
|
||||
if (!callback)
|
||||
throw new Error("Failed to catch-up listener: callback does not exist");
|
||||
const notifications = await prisma.notification.findMany({
|
||||
where: { userId: userId },
|
||||
orderBy: {
|
||||
created: "asc", // Oldest first, because they arrive in reverse order
|
||||
},
|
||||
});
|
||||
for (const notification of notifications) {
|
||||
await callback(notification);
|
||||
}
|
||||
}
|
||||
|
||||
private async pushNotification(userId: string, notification: Notification) {
|
||||
for (const listener of this.listeners[userId] ?? []) {
|
||||
await listener[1](notification);
|
||||
}
|
||||
}
|
||||
|
||||
async push(userId: string, notificationCreateArgs: NotificationCreateArgs) {
|
||||
const notification = await prisma.notification.create({
|
||||
data: {
|
||||
userId: userId,
|
||||
...notificationCreateArgs,
|
||||
},
|
||||
});
|
||||
|
||||
await this.pushNotification(userId, notification);
|
||||
}
|
||||
|
||||
async pushAll(notificationCreateArgs: NotificationCreateArgs) {
|
||||
const users = await prisma.user.findMany({
|
||||
where: { id: { not: "system" } },
|
||||
select: {
|
||||
id: true,
|
||||
},
|
||||
});
|
||||
|
||||
for (const user of users) {
|
||||
await this.push(user.id, notificationCreateArgs);
|
||||
}
|
||||
}
|
||||
|
||||
async systemPush(notificationCreateArgs: NotificationCreateArgs) {
|
||||
return await this.push("system", notificationCreateArgs);
|
||||
}
|
||||
}
|
||||
|
||||
export const notificationSystem = new NotificationSystem();
|
||||
export default notificationSystem;
|
||||
Reference in New Issue
Block a user