diff --git a/package-lock.json b/package-lock.json index 2e50baf..9add4fd 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6592,6 +6592,12 @@ "resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz", "integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==" }, + "node_modules/@types/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==", + "dev": true + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "5.62.0", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.62.0.tgz", @@ -16480,6 +16486,7 @@ "rate-limit-redis": "^4.2.0", "redis": "^4.6.12", "socket.io": "^4.7.2", + "uuid": "^10.0.0", "winston": "^3.11.0" }, "devDependencies": { @@ -16487,6 +16494,7 @@ "@types/cors": "^2.8.17", "@types/express": "^4.17.17", "@types/express-session": "^1.17.7", + "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^7.1.0", "@typescript-eslint/parser": "^7.1.0", "dotenv": "^16.3.1", @@ -16783,6 +16791,18 @@ "engines": { "node": ">= 0.8" } + }, + "packages/server/node_modules/uuid": { + "version": "10.0.0", + "resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz", + "integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==", + "funding": [ + "https://github.com/sponsors/broofa", + "https://github.com/sponsors/ctavan" + ], + "bin": { + "uuid": "dist/bin/uuid" + } } } } diff --git a/packages/client/src/lib/canvas.ts b/packages/client/src/lib/canvas.ts index 4e40448..e11e595 100644 --- a/packages/client/src/lib/canvas.ts +++ b/packages/client/src/lib/canvas.ts @@ -102,10 +102,25 @@ export class Canvas extends EventEmitter { if (Object.keys(this.pixels).length > 0) Network.clearPreviousState("canvas"); - Network.waitForState("canvas").then(([pixels]) => { - console.log("loadConfig just received new canvas data"); - this.handleBatch(pixels); + // Network.waitForState("canvas").then(([pixels]) => { + // console.log("loadConfig just received new canvas data"); + // this.handleBatch(pixels); + // }); + + Network.on("canvas", (start, end, pixels) => { + console.log("[Canvas] received canvas section"); + this.handleBatch(start, end, pixels); }); + + const chunks = Network.getCanvasChunks(); + console.log(`[Canvas] Received ${chunks.length} chunks to load`); + let loaded = 0; + for (const chunk of chunks) { + console.log(`[Canvas] Loading canvas chunk ${loaded}...`); + this.handleBatch(chunk.start, chunk.end, chunk.pixels); + + loaded++; + } } hasConfig() { @@ -267,27 +282,36 @@ export class Canvas extends EventEmitter { getRenderer().usePixels(serializeBuild); }; - handleBatch = (pixels: string[]) => { + handleBatch = ( + start: [x: number, y: number], + end: [x: number, y: number], + pixels: string[] + ) => { if (!this.config.canvas) { throw new Error("handleBatch called with no config"); } let serializeBuild: CanvasPixel[] = []; + const width = end[0] - start[0]; + const height = end[1] - start[1]; - for (let x = 0; x < this.config.canvas.size[0]; x++) { - for (let y = 0; y < this.config.canvas.size[1]; y++) { - const hex = pixels[this.config.canvas.size[0] * y + x]; + for (let x = 0; x < width; x++) { + for (let y = 0; y < height; y++) { + const hex = pixels[width * y + x]; const palette = this.Pallete.getColorFromHex(hex); + const canvasX = x + start[0]; + const canvasY = y + start[1]; + // we still store a copy of the pixels in this instance for non-rendering functions - this.pixels[x + "_" + y] = { + this.pixels[canvasX + "_" + canvasY] = { type: "full", color: palette?.id || -1, }; serializeBuild.push({ - x, - y, + x: canvasX, + y: canvasY, hex: hex === "transparent" ? "null" : hex, }); } diff --git a/packages/client/src/lib/canvasRenderer.ts b/packages/client/src/lib/canvasRenderer.ts index a0e4fc8..97be57e 100644 --- a/packages/client/src/lib/canvasRenderer.ts +++ b/packages/client/src/lib/canvasRenderer.ts @@ -8,8 +8,6 @@ export type CanvasPixel = { hex: string; }; -const bezier = (n: number) => n * n * (3 - 2 * n); - const isWorker = () => { return ( // @ts-ignore @@ -41,8 +39,18 @@ export class CanvasRenderer extends EventEmitter { private blank?: RCanvas; private blank_ctx?: RContext; - private pixels: CanvasPixel[] = []; - private allPixels: CanvasPixel[] = []; + /** + * Pixels that need to be drawn next draw call + * + * Key = x,y (eg 0,0) + */ + private pixels: Map = new Map(); + /** + * Every pixel + * + * Key = x,y (eg 0,0) + */ + private allPixels: Map = new Map(); private isWorker = isWorker(); private _stopRender = false; @@ -87,37 +95,15 @@ export class CanvasRenderer extends EventEmitter { } usePixels(pixels: CanvasPixel[], replace = false) { - if (replace) { - this.pixels = pixels; - this.allPixels = pixels; - } else { - for (const pixel of pixels) { - this.usePixel(pixel); - } + for (const pixel of pixels) { + this.usePixel(pixel); } } usePixel(pixel: CanvasPixel) { - { - let existing = this.pixels.find( - (p) => p.x === pixel.x && p.y === pixel.y - ); - if (existing) { - this.pixels.splice(this.pixels.indexOf(existing), 1); - } - } - - { - let existing = this.allPixels.find( - (p) => p.x === pixel.x && p.y === pixel.y - ); - if (existing) { - this.allPixels.splice(this.allPixels.indexOf(existing), 1); - } - } - - this.pixels.push(pixel); - this.allPixels.push(pixel); + let key = pixel.x + "," + pixel.y; + this.pixels.set(key, pixel.hex); + this.allPixels.set(key, pixel.hex); } startRender() { @@ -179,16 +165,19 @@ export class CanvasRenderer extends EventEmitter { draw() { const start = performance.now(); - const pixels = [...this.pixels]; - this.pixels = []; + const pixels = new Map(this.pixels); + this.pixels.clear(); - if (pixels.length) { - console.log("[CanvasRenderer#draw] drawing " + pixels.length + " pixels"); + if (pixels.size) { + console.log("[CanvasRenderer#draw] drawing " + pixels.size + " pixels"); } - for (const pixel of pixels) { - this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex; - this.ctx.fillRect(pixel.x, pixel.y, 1, 1); + for (const [x_y, hex] of pixels) { + const x = parseInt(x_y.split(",")[0]); + const y = parseInt(x_y.split(",")[1]); + + this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex; + this.ctx.fillRect(x, y, 1, 1); } const diff = performance.now() - start; @@ -219,9 +208,12 @@ export class CanvasRenderer extends EventEmitter { this.ctx.fillStyle = "#fff"; this.ctx.fillRect(0, 0, this.canvas.width, this.canvas.height); - for (const pixel of this.allPixels) { - this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex; - this.ctx.fillRect(pixel.x, pixel.y, 1, 1); + for (const [x_y, hex] of this.allPixels) { + const x = parseInt(x_y.split(",")[0]); + const y = parseInt(x_y.split(",")[1]); + + this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex; + this.ctx.fillRect(x, y, 1, 1); } } @@ -234,11 +226,13 @@ export class CanvasRenderer extends EventEmitter { ctx.clearRect(0, 0, canvas.width, canvas.height); - for (const pixel of this.allPixels) { - if (pixel.hex !== "null") continue; + for (const [x_y, hex] of this.allPixels) { + if (hex !== "null") continue; + const x = parseInt(x_y.split(",")[0]); + const y = parseInt(x_y.split(",")[1]); ctx.fillStyle = "rgba(0,140,0,0.5)"; - ctx.fillRect(pixel.x, pixel.y, 1, 1); + ctx.fillRect(x, y, 1, 1); } } } diff --git a/packages/client/src/lib/network.ts b/packages/client/src/lib/network.ts index 2c0180f..4143fef 100644 --- a/packages/client/src/lib/network.ts +++ b/packages/client/src/lib/network.ts @@ -20,7 +20,11 @@ export interface INetworkEvents { user: (user: AuthSession) => void; standing: (standing: IAccountStanding) => void; config: (user: ClientConfig) => void; - canvas: (pixels: string[]) => void; + canvas: ( + start: [x: number, y: number], + end: [x: number, y: number], + pixels: string[] + ) => void; pixels: (data: { available: number }) => void; pixelLastPlaced: (time: number) => void; online: (count: number) => void; @@ -55,6 +59,12 @@ class Network extends EventEmitter { [key in keyof INetworkEvents]?: SentEventValue; } = {}; + private canvasChunks: { + start: [number, number]; + end: [number, number]; + pixels: string[]; + }[] = []; + constructor() { super(); @@ -123,8 +133,14 @@ class Network extends EventEmitter { this.emit("config", config); }); - this.socket.on("canvas", (pixels) => { - this.acceptState("canvas", pixels); + this.socket.on("canvas", (start, end, pixels) => { + // this.acceptState("canvas", start, end, pixels); + this.emit("canvas", start, end, pixels); + this.canvasChunks.push({ start, end, pixels }); + }); + + this.socket.on("clearCanvasChunks", () => { + this.canvasChunks = []; }); this.socket.on("availablePixels", (count) => { @@ -191,6 +207,10 @@ class Network extends EventEmitter { delete this.stateEvents[ev]; } + getCanvasChunks() { + return this.canvasChunks; + } + /** * Wait for event, either being already sent, or new one * diff --git a/packages/lib/src/net.ts b/packages/lib/src/net.ts index 6605058..01d218d 100644 --- a/packages/lib/src/net.ts +++ b/packages/lib/src/net.ts @@ -3,7 +3,12 @@ export type Subscription = "heatmap"; export interface ServerToClientEvents { - canvas: (pixels: string[]) => void; + canvas: ( + start: [x: number, y: number], + end: [x: number, y: number], + pixels: string[] + ) => void; + clearCanvasChunks: () => void; user: (user: AuthSession) => void; standing: (standing: IAccountStanding) => void; config: (config: ClientConfig) => void; diff --git a/packages/server/package.json b/packages/server/package.json index 89a447b..33e701d 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -20,6 +20,7 @@ "@types/cors": "^2.8.17", "@types/express": "^4.17.17", "@types/express-session": "^1.17.7", + "@types/uuid": "^10.0.0", "@typescript-eslint/eslint-plugin": "^7.1.0", "@typescript-eslint/parser": "^7.1.0", "dotenv": "^16.3.1", @@ -44,6 +45,7 @@ "rate-limit-redis": "^4.2.0", "redis": "^4.6.12", "socket.io": "^4.7.2", + "uuid": "^10.0.0", "winston": "^3.11.0" } } diff --git a/packages/server/src/api/client.ts b/packages/server/src/api/client.ts index 0c0ba64..01d04a2 100644 --- a/packages/server/src/api/client.ts +++ b/packages/server/src/api/client.ts @@ -6,6 +6,8 @@ import { getLogger } from "../lib/Logger"; import Canvas from "../lib/Canvas"; import { RateLimiter } from "../lib/RateLimiter"; import { Instance } from "../models/Instance"; +import { callCacheWorker } from "../workers/worker"; +import { Redis } from "../lib/redis"; const Logger = getLogger("HTTP/CLIENT"); diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 7a9fcdd..48bcd80 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -7,6 +7,7 @@ import { SocketServer } from "./lib/SocketServer"; import { OpenID } from "./lib/oidc"; import { loadSettings } from "./lib/Settings"; import "./workers/worker"; +import { spawnCacheWorkers } from "./workers/worker"; const Logger = getLogger("MAIN"); @@ -85,12 +86,17 @@ if (!process.env.PIXEL_LOG_PATH) { Logger.warn("PIXEL_LOG_PATH is not defined, defaulting to packages/server"); } +if (!process.env.CACHE_WORKERS) { + Logger.warn("CACHE_WORKERS is not defined, defaulting to 1 worker"); +} + // run startup tasks, all of these need to be completed to serve Promise.all([ Redis.getClient(), OpenID.setup().then(() => { Logger.info("Setup OpenID"); }), + spawnCacheWorkers(), loadSettings(), ]).then(() => { Logger.info("Startup tasks have completed, starting server"); diff --git a/packages/server/src/lib/Canvas.ts b/packages/server/src/lib/Canvas.ts index d73ddb8..3973c5d 100644 --- a/packages/server/src/lib/Canvas.ts +++ b/packages/server/src/lib/Canvas.ts @@ -1,14 +1,26 @@ -import { CanvasConfig } from "@sc07-canvas/lib/src/net"; +import { + CanvasConfig, + ClientToServerEvents, + ServerToClientEvents, +} from "@sc07-canvas/lib/src/net"; import { prisma } from "./prisma"; import { Redis } from "./redis"; import { SocketServer } from "./SocketServer"; import { getLogger } from "./Logger"; import { Pixel } from "@prisma/client"; -import { CanvasWorker, callWorkerMethod } from "../workers/worker"; +import { + callCacheWorker, + callWorkerMethod, + getCacheWorkerIdForCoords, + getCanvasCacheWorker, +} from "../workers/worker"; import { LogMan } from "./LogMan"; +import { Socket } from "socket.io"; const Logger = getLogger("CANVAS"); +const canvasSectionSize = [100, 100]; // TODO: config maybe? + class Canvas { /** * Size of the canvas @@ -68,14 +80,10 @@ class Canvas { /** * Change size of the canvas * - * Expensive task, will take a bit - * * @param width * @param height */ async setSize(width: number, height: number, useStatic = false) { - CanvasWorker.postMessage({ type: "canvasSize", width, height }); - if (useStatic) { this.canvasSize = [width, height]; return; @@ -100,7 +108,7 @@ class Canvas { }, }); - // the redis key is 1D, since the dimentions changed we need to update it + // update cached canvas chunks await this.canvasToRedis(); // this gets called on startup, before the SocketServer is initialized @@ -109,10 +117,9 @@ class Canvas { // announce the new config, which contains the canvas size SocketServer.instance.broadcastConfig(); - // announce new pixel array that was generated previously - await this.getPixelsArray().then((pixels) => { - SocketServer.instance?.io.emit("canvas", pixels); - }); + // announce all canvas chunks + SocketServer.instance.io.emit("clearCanvasChunks"); + await this.broadcastCanvasChunks(); } else { Logger.warn( "[Canvas#setSize] No SocketServer instance, cannot broadcast config change" @@ -126,6 +133,54 @@ class Canvas { ); } + async sendCanvasChunksToSocket( + socket: Socket + ) { + await this.getAllChunks((start, end, data) => { + socket.emit("canvas", start, end, data.split(",")); + }); + } + + async broadcastCanvasChunks() { + await this.getAllChunks((start, end, data) => { + SocketServer.instance.io.emit("canvas", start, end, data.split(",")); + }); + } + + async getAllChunks( + chunkCallback: ( + start: [x: number, y: number], + end: [x: number, y: number], + data: string + ) => any + ) { + const redis = await Redis.getClient(); + let pending: Promise[] = []; + + for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) { + for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) { + const start: [number, number] = [x, y]; + const end: [number, number] = [ + x + canvasSectionSize[0], + y + canvasSectionSize[1], + ]; + + pending.push( + new Promise((res) => { + redis + .get(Redis.key("canvas_section", start, end)) + .then((value): any => { + chunkCallback(start, end, value || ""); + res(); + }); + }) + ); + } + } + + await Promise.allSettled(pending); + } + async forceUpdatePixelIsTop() { const now = Date.now(); Logger.info("[Canvas#forceUpdatePixelIsTop] is starting..."); @@ -210,67 +265,65 @@ class Canvas { } /** - * Converts database pixels to Redis string + * Chunks canvas pixels and caches chunks in redis * * @worker * @returns */ - canvasToRedis(): Promise { - return new Promise((res) => { - Logger.info("Triggering canvasToRedis()"); - const [width, height] = this.getCanvasConfig().size; + async canvasToRedis(): Promise { + const start = Date.now(); - CanvasWorker.once("message", (msg) => { - if (msg.type === "canvasToRedis") { - Logger.info("Finished canvasToRedis()"); - res(msg.data); - } - }); + let pending: Promise[] = []; - CanvasWorker.postMessage({ - type: "canvasToRedis", - width, - height, - }); - }); + for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) { + for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) { + pending.push( + callCacheWorker("cache", { + start: [x, y], + end: [x + canvasSectionSize[0], y + canvasSectionSize[1]], + }) + ); + } + } + + await Promise.allSettled(pending); + Logger.info( + `Finished canvasToRedis() in ${((Date.now() - start) / 1000).toFixed(2)}s` + ); } /** * force an update at a specific position */ async updateCanvasRedisAtPos(x: number, y: number) { + const redis = await Redis.getClient(); const dbpixel = await this.getPixel(x, y); - await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", { - x, - y, - hex: dbpixel?.color || "transparent", - }); + // ensure pixels in the same location are always in the same queue + const workerId = getCacheWorkerIdForCoords(x, y); + + // queue canvas writes in redis to avoid memory issues in badly written queue code + redis.lPush( + Redis.key("canvas_cache_write_queue", workerId), + x + "," + y + "," + (dbpixel?.color || "transparent") + ); } async updateCanvasRedisWithBatch( pixelBatch: { x: number; y: number; hex: string }[] ) { - await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", { - batch: pixelBatch, - }); - } - - async isPixelArrayCached() { const redis = await Redis.getClient(); - return await redis.exists(Redis.key("canvas")); - } + for (const pixel of pixelBatch) { + // ensure pixels in the same location are always in the same queue + const workerId = getCacheWorkerIdForCoords(pixel.x, pixel.y); - async getPixelsArray() { - const redis = await Redis.getClient(); - - if (await redis.exists(Redis.key("canvas"))) { - const cached = await redis.get(Redis.key("canvas")); - return cached!.split(","); + // queue canvas writes in redis + redis.lPush( + Redis.key("canvas_cache_write_queue", workerId), + [pixel.x, pixel.y, pixel.hex].join(",") + ); } - - return await this.canvasToRedis(); } /** @@ -468,7 +521,6 @@ class Canvas { */ async generateHeatmap() { const redis_set = await Redis.getClient("MAIN"); - const redis_sub = await Redis.getClient("SUB"); const now = Date.now(); const minimumDate = new Date(); diff --git a/packages/server/src/lib/Logger.ts b/packages/server/src/lib/Logger.ts index ec9261a..6ff7a52 100644 --- a/packages/server/src/lib/Logger.ts +++ b/packages/server/src/lib/Logger.ts @@ -12,12 +12,17 @@ const formatter = format.printf((options) => { maxModuleWidth = Math.max(maxModuleWidth, `[${module}]`.length); } + let moduleName = options.moduleName; + if (typeof options.workerId !== "undefined") { + moduleName += " #" + options.workerId; + } + let modulePadding = " ".repeat( - Math.max(0, maxModuleWidth - `[${options.moduleName}]`.length) + Math.max(0, maxModuleWidth - `[${moduleName}]`.length) ); let parts: string[] = [ - options.timestamp + ` [${options.moduleName || "---"}]` + modulePadding, + options.timestamp + ` [${moduleName || "---"}]` + modulePadding, options.level + ":", options.message, ]; @@ -58,5 +63,7 @@ export const LoggerType = createEnum([ "RECAPTCHA", ]); -export const getLogger = (module?: keyof typeof LoggerType) => - Winston.child({ moduleName: module }); +export const getLogger = ( + module?: keyof typeof LoggerType, + workerId?: number +) => Winston.child({ moduleName: module, workerId }); diff --git a/packages/server/src/lib/Prometheus.ts b/packages/server/src/lib/Prometheus.ts index a82c793..5324592 100644 --- a/packages/server/src/lib/Prometheus.ts +++ b/packages/server/src/lib/Prometheus.ts @@ -4,6 +4,7 @@ import e from "express"; import { SocketServer } from "./SocketServer"; import Canvas from "./Canvas"; import { Redis } from "./redis"; +import { CACHE_WORKERS, getCacheWorkerQueueLength } from "../workers/worker"; client.collectDefaultMetrics({ labels: process.env.NODE_APP_INSTANCE @@ -87,6 +88,34 @@ export const TotalPixels = new client.Gauge({ }, }); +const CacheWorkerQueueMain = new client.Gauge({ + name: "cache_worker_callback_queue_main", + help: "cache worker callback queue length for main process", + + collect() { + this.set(getCacheWorkerQueueLength()); + }, +}); + +const CacheWorkerQueueWorkers = new client.Gauge({ + name: "cache_worker_queue_workers", + help: "cache worker write queue length per worker process", + labelNames: ["worker_id"], + + async collect() { + const redis = await Redis.getClient(); + + for (let i = 0; i < CACHE_WORKERS; i++) { + this.set( + { + worker_id: i, + }, + await redis.lLen(Redis.key("canvas_cache_write_queue", i)) + ); + } + }, +}); + export const handleMetricsEndpoint = async ( req: e.Request, res: e.Response diff --git a/packages/server/src/lib/SocketServer.ts b/packages/server/src/lib/SocketServer.ts index defa799..c9ab344 100644 --- a/packages/server/src/lib/SocketServer.ts +++ b/packages/server/src/lib/SocketServer.ts @@ -165,6 +165,7 @@ export class SocketServer { user?.sockets.add(socket); Logger.debug("handleConnection " + user?.sockets.size); + socket.emit("clearCanvasChunks"); Redis.getClient().then((redis) => { if (user) redis.set(Redis.key("socketToSub", socket.id), user.sub); @@ -197,29 +198,12 @@ export class SocketServer { socket.emit("config", getClientConfig()); { - let _clientNotifiedAboutCache = false; - Canvas.isPixelArrayCached().then((cached) => { - if (!cached) { - _clientNotifiedAboutCache = true; - socket.emit("alert", { - id: "canvas_cache_pending", - is: "toast", - action: "system", - severity: "info", - title: "Canvas loading", - body: "Canvas not cached, this may take a couple seconds", - autoDismiss: true, - }); - } - }); - Canvas.getPixelsArray().then((pixels) => { - socket.emit("canvas", pixels); - socket.emit("alert_dismiss", "canvas_cache_pending"); + Canvas.sendCanvasChunksToSocket(socket).then(() => { socket.emit("alert", { is: "toast", action: "system", severity: "success", - title: "Canvas loaded!", + title: "Canvas loaded", autoDismiss: true, }); }); diff --git a/packages/server/src/lib/redis.ts b/packages/server/src/lib/redis.ts index dd56a62..c52cea7 100644 --- a/packages/server/src/lib/redis.ts +++ b/packages/server/src/lib/redis.ts @@ -9,8 +9,13 @@ const Logger = getLogger("REDIS"); */ interface IRedisKeys { // canvas - canvas(): string; + // canvas(): string; heatmap(): string; + canvas_section( + start: [x: number, y: number], + end: [x: number, y: number] + ): string; + canvas_cache_write_queue(workerId: number): string; // users socketToSub(socketId: string): string; @@ -23,8 +28,11 @@ interface IRedisKeys { * Defined as a variable due to boottime augmentation */ const RedisKeys: IRedisKeys = { - canvas: () => `CANVAS:PIXELS`, + // canvas: () => `CANVAS:PIXELS`, heatmap: () => `CANVAS:HEATMAP`, + canvas_section: (start, end) => + `CANVAS:PIXELS:${start.join(",")}:${end.join(",")}`, + canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`, socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`, channel_heatmap: () => `CANVAS:HEATMAP`, }; diff --git a/packages/server/src/types.ts b/packages/server/src/types.ts index e7f7c31..7e3a725 100644 --- a/packages/server/src/types.ts +++ b/packages/server/src/types.ts @@ -62,12 +62,14 @@ declare global { MATRIX_GENERAL_ALIAS: string; PIXEL_LOG_PATH?: string; - + RECAPTCHA_SITE_KEY?: string; RECAPTCHA_SECRET_KEY?: string; RECAPTCHA_PIXEL_CHANCE?: string; DISCORD_WEBHOOK?: string; + + CACHE_WORKERS?: string; } } } diff --git a/packages/server/src/workers/canvas.ts b/packages/server/src/workers/canvas.ts deleted file mode 100644 index 3a9dd85..0000000 --- a/packages/server/src/workers/canvas.ts +++ /dev/null @@ -1,157 +0,0 @@ -import { parentPort } from "node:worker_threads"; -import { Redis } from "../lib/redis"; -import { prisma } from "../lib/prisma"; -import { getLogger } from "../lib/Logger"; -import { Pixel } from "@prisma/client"; - -type Message = - | { type: "canvasSize"; width: number; height: number } - | { - type: "canvasToRedis"; - } - | { - type: "updateCanvasRedisAtPos"; - callbackId: number; - x: number; - y: number; - hex: string | "transparent"; - } - | { - type: "updateCanvasRedisWithBatch"; - callbackId: number; - batch: { x: number; y: number; hex: string }[]; - }; - -const Logger = getLogger("CANVAS_WORK"); - -/** - * We run the connection directly instead of via class functions to prevent side effects - */ -const redis = Redis.client; -redis.connect().then(() => { - Logger.info("Connected to Redis"); -}); - -const queuedCanvasRedis: { - id: number; - pixels: { x: number; y: number; hex: string | "transparent" }[]; -}[] = []; - -let canvasSize = { width: -1, height: -1 }; - -parentPort?.on("message", (msg: Message) => { - switch (msg.type) { - case "canvasSize": - canvasSize = { width: msg.width, height: msg.height }; - Logger.info("Received canvasSize " + JSON.stringify(canvasSize)); - break; - case "canvasToRedis": - if (canvasSize.width === -1 || canvasSize.height === -1) { - Logger.error("Received canvasToRedis but i do not have the dimentions"); - return; - } - - canvasToRedis(canvasSize.width, canvasSize.height).then((str) => { - parentPort?.postMessage({ type: "canvasToRedis", data: str }); - }); - break; - case "updateCanvasRedisAtPos": - queuedCanvasRedis.push({ - id: msg.callbackId, - pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }], - }); - startCanvasRedisIfNeeded(); - break; - case "updateCanvasRedisWithBatch": - queuedCanvasRedis.push({ - id: msg.callbackId, - pixels: msg.batch, - }); - startCanvasRedisIfNeeded(); - break; - } -}); - -const execCallback = (id: number) => { - parentPort?.postMessage({ type: "callback", callbackId: id }); -}; - -/** - * Convert database pixels to Redis cache - * - * This does not depend on the Canvas class and can be ran inside the worker - * - * @param width - * @param height - * @returns - */ -const canvasToRedis = async (width: number, height: number) => { - const now = Date.now(); - Logger.info("Starting canvasToRedis..."); - - const dbpixels = await prisma.pixel.findMany({ - where: { - x: { - gte: 0, - lt: width, - }, - y: { - gte: 0, - lt: height, - }, - isTop: true, - }, - }); - - const pixels: string[] = []; - - // (y -> x) because of how the conversion needs to be done later - // if this is inverted, the map will flip when rebuilding the cache (5 minute expiry) - // fixes #24 - for (let y = 0; y < height; y++) { - for (let x = 0; x < width; x++) { - pixels.push( - dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent" - ); - } - } - - await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); - - Logger.info( - "Finished canvasToRedis in " + ((Date.now() - now) / 1000).toFixed(2) + "s" - ); - return pixels; -}; - -let isCanvasRedisWorking = false; - -const startCanvasRedisIfNeeded = () => { - if (isCanvasRedisWorking) return; - - tickCanvasRedis(); -}; - -const tickCanvasRedis = async () => { - isCanvasRedisWorking = true; - - const item = queuedCanvasRedis.shift(); - if (!item) { - isCanvasRedisWorking = false; - return; - } - - const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split( - "," - ); - - for (const pixel of item.pixels) { - pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex; - } - - await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); - - execCallback(item.id); - - await tickCanvasRedis(); -}; diff --git a/packages/server/src/workers/canvas_cache.ts b/packages/server/src/workers/canvas_cache.ts new file mode 100644 index 0000000..486062e --- /dev/null +++ b/packages/server/src/workers/canvas_cache.ts @@ -0,0 +1,164 @@ +/** + * Cache the contents of the database into redis keys + * + * Each cache chunk should aim be 100x100 pixels + */ + +import { parentPort } from "node:worker_threads"; +import { getLogger } from "../lib/Logger"; +import { Redis } from "../lib/redis"; +import { prisma } from "../lib/prisma"; + +// TODO: config maybe? +// this value is hardcoded in #getCanvasSectionFromCoords +const canvasSectionSize = [100, 100]; + +type Message = + | { type: "id"; workerId: number } + | { + type: "cache"; + start: [x: number, y: number]; + end: [x: number, y: number]; + callbackId: string; + } + | { + type: "write_pixel"; + }; + +let Logger = getLogger("CANVAS_WORK"); + +/** + * We run the connection directly instead of via class functions to prevent side effects + */ +const redis = Redis.client; +redis.connect().then(() => { + Logger.info("Connected to Redis"); +}); + +let workerId: number; + +parentPort?.on("message", (msg: Message) => { + switch (msg.type) { + case "id": + workerId = msg.workerId; + Logger = getLogger("CANVAS_WORK", workerId); + Logger.info("Received worker ID assignment: " + workerId); + startWriteQueue().then(() => {}); + break; + case "cache": + doCache(msg.start, msg.end).then(() => { + parentPort?.postMessage({ + type: "callback", + callbackId: msg.callbackId, + }); + }); + break; + } +}); + +/** + * Get canvas section from coordinates + * + * @note This is hardcoded to expect the section size to be 100x100 pixels + * + * @param x + * @param y + */ +const getCanvasSectionFromCoords = ( + x: number, + y: number +): { start: [x: number, y: number]; end: [x: number, y: number] } => { + // since we are assuming the section size is 100x100 + // we can get the start position based on the hundreds position + const baseX = Math.floor((x % 1000) / 100); // get the hundreds + const baseY = Math.floor((y % 1000) / 100); // get the hundreds + + return { + start: [baseX * 100, baseY * 100], + end: [baseX * 100 + 100, baseY * 100 + 100], + }; +}; + +const startWriteQueue = async () => { + const item = await redis.lPop( + Redis.key("canvas_cache_write_queue", workerId) + ); + if (!item) { + setTimeout(() => { + startWriteQueue(); + }, 250); + return; + } + + const x = parseInt(item.split(",")[0]); + const y = parseInt(item.split(",")[1]); + const color = item.split(",")[2]; + + const section = getCanvasSectionFromCoords(x, y); + + const pixels: string[] = ( + (await redis.get( + Redis.key("canvas_section", section.start, section.end) + )) || "" + ).split(","); + + const arrX = x - section.start[0]; + const arrY = y - section.start[1]; + + pixels[canvasSectionSize[0] * arrY + arrX] = color; + + await redis.set( + Redis.key("canvas_section", section.start, section.end), + pixels.join(",") + ); + + startWriteQueue(); +}; + +const doCache = async ( + start: [x: number, y: number], + end: [x: number, y: number] +) => { + const now = Date.now(); + Logger.info( + "starting cache of section " + start.join(",") + " -> " + end.join(",") + ); + const dbpixels = await prisma.pixel.findMany({ + where: { + x: { + gte: start[0], + lt: end[0], + }, + y: { + gte: start[1], + lt: end[1], + }, + isTop: true, + }, + }); + + const pixels: string[] = []; + + // (y -> x) because of how the conversion needs to be done later + // if this is inverted, the map will flip when rebuilding the cache (5 minute expiry) + // fixes #24 + for (let y = start[1]; y < end[1]; y++) { + for (let x = start[0]; x < end[0]; x++) { + pixels.push( + dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent" + ); + } + } + + await redis.set(Redis.key("canvas_section", start, end), pixels.join(",")); + + Logger.info( + "finished cache of section " + + start.join(",") + + " -> " + + end.join(",") + + " in " + + ((Date.now() - now) / 1000).toFixed(2) + + "s" + ); +}; diff --git a/packages/server/src/workers/worker.ts b/packages/server/src/workers/worker.ts index c50dd7c..b5d137a 100644 --- a/packages/server/src/workers/worker.ts +++ b/packages/server/src/workers/worker.ts @@ -1,8 +1,12 @@ import { Worker, WorkerOptions } from "node:worker_threads"; import path from "node:path"; +import { v4 as uuid } from "uuid"; import { getLogger } from "../lib/Logger"; const Logger = getLogger("WORKER_ROOT"); +export const CACHE_WORKERS = process.env.CACHE_WORKERS + ? parseInt(process.env.CACHE_WORKERS) || 1 + : 1; export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => { if (process.env.NODE_ENV === "production") { @@ -34,11 +38,141 @@ export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => { } }; -const AllWorkers = { - canvas: spawnWorker("canvas.ts"), +// not used as of right now +// dedicated worker threads for specific tasks would go here +const AllWorkers: { [k: string]: Worker } = {}; + +let cacheWorkers: Worker[] = []; + +/** + * Return consistent worker ID for the specified coordinates + * @param x + * @param y + * @returns + */ +export const getCacheWorkerIdForCoords = (x: number, y: number): number => { + const key = (x + y) % cacheWorkers.length; + return key; }; -export const CanvasWorker = AllWorkers.canvas; +/** + * Return consistent worker for the specified coordinates + * @param x + * @param y + * @returns + */ +export const getCacheWorkerForCoords = (x: number, y: number): Worker => { + return cacheWorkers[getCacheWorkerIdForCoords(x, y)]; +}; + +/** + * Spawns cache workers + * + * Promise resolves when all of them are alive + * + * @param num + */ +export const spawnCacheWorkers = async (num?: number): Promise => { + if (typeof num === "undefined") { + // if the function isn't told, use the environment variables + num = CACHE_WORKERS; + } + + Logger.info(`Spawning ${num} cache workers...`); + + let pending: Promise[] = []; + + for (let i = 0; i < num; i++) { + const worker = spawnWorker("canvas_cache"); + + pending.push( + new Promise((res) => { + worker.on("online", () => { + Logger.info(`Canvas cache worker #${i} is now online`); + + worker.postMessage({ type: "id", workerId: i }); + + res(undefined); + }); + }) + ); + + worker.on("error", (err) => { + Logger.error(`Canvas cache worker #${i} has errored`); + console.error(err); + }); + + worker.on("exit", (exit) => { + Logger.warn(`Canvas cache worker #${i} has exited ${exit}`); + const index = cacheWorkers.indexOf(worker); + if (index > -1) { + cacheWorkers.splice(index, 1); + Logger.info(`Removed dead worker #${i} from pool`); + } + }); + + setupWorkerCallback(worker); + + cacheWorkers.push(worker); + } + + await Promise.allSettled(pending); + Logger.info(`Successfully spawned ${num} cache workers`); +}; + +export const getCanvasCacheWorker = () => { + return cacheWorkers[Math.floor(Math.random() * cacheWorkers.length)]; +}; + +let cacheWorkerQueue: { [k: string]: () => any } = {}; + +/** + * Prometheus metrics + * @returns + */ +export const getCacheWorkerQueueLength = () => + Object.keys(cacheWorkerQueue).length; + +const setupWorkerCallback = (worker: Worker) => { + worker.on("message", (message: { type: "callback"; callbackId: number }) => { + if (message.type !== "callback") return; + + const callback = cacheWorkerQueue[message.callbackId]; + if (!callback) { + Logger.warn( + "Received callback message from worker, but no callbacks are waiting " + + message.callbackId + ); + return; + } + + callback(); + delete cacheWorkerQueue[message.callbackId]; + }); +}; + +export const callCacheWorker = (type: string, data: any) => { + return new Promise((res) => { + const callbackId = uuid(); + + cacheWorkerQueue[callbackId] = () => { + res(); + clearTimeout(watchdog); + }; + let watchdog = setTimeout(() => { + Logger.error( + `Callback for ${type} ${callbackId} has taken too long, is it dead?` + ); + }, 10000); + + const worker = getCanvasCacheWorker(); + worker.postMessage({ + ...data, + type, + callbackId, + }); + }); +}; export const callWorkerMethod = (worker: Worker, type: string, data: any) => { return new Promise((res) => {