diff --git a/packages/server/src/lib/Canvas.ts b/packages/server/src/lib/Canvas.ts index 30c9e5e..d73ddb8 100644 --- a/packages/server/src/lib/Canvas.ts +++ b/packages/server/src/lib/Canvas.ts @@ -4,7 +4,7 @@ import { Redis } from "./redis"; import { SocketServer } from "./SocketServer"; import { getLogger } from "./Logger"; import { Pixel } from "@prisma/client"; -import { CanvasWorker } from "../workers/worker"; +import { CanvasWorker, callWorkerMethod } from "../workers/worker"; import { LogMan } from "./LogMan"; const Logger = getLogger("CANVAS"); @@ -74,6 +74,8 @@ class Canvas { * @param height */ async setSize(width: number, height: number, useStatic = false) { + CanvasWorker.postMessage({ type: "canvasSize", width, height }); + if (useStatic) { this.canvasSize = [width, height]; return; @@ -237,33 +239,21 @@ class Canvas { * force an update at a specific position */ async updateCanvasRedisAtPos(x: number, y: number) { - const redis = await Redis.getClient(); - - const pixels: string[] = ( - (await redis.get(Redis.key("canvas"))) || "" - ).split(","); - const dbpixel = await this.getPixel(x, y); - pixels[this.canvasSize[0] * y + x] = dbpixel?.color || "transparent"; - - await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); + await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", { + x, + y, + hex: dbpixel?.color || "transparent", + }); } async updateCanvasRedisWithBatch( pixelBatch: { x: number; y: number; hex: string }[] ) { - const redis = await Redis.getClient(); - - const pixels: string[] = ( - (await redis.get(Redis.key("canvas"))) || "" - ).split(","); - - for (const pixel of pixelBatch) { - pixels[this.canvasSize[0] * pixel.y + pixel.x] = pixel.hex; - } - - await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); + await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", { + batch: pixelBatch, + }); } async isPixelArrayCached() { diff --git a/packages/server/src/workers/canvas.ts b/packages/server/src/workers/canvas.ts index 014583c..3a9dd85 100644 --- a/packages/server/src/workers/canvas.ts +++ b/packages/server/src/workers/canvas.ts @@ -2,12 +2,25 @@ import { parentPort } from "node:worker_threads"; import { Redis } from "../lib/redis"; import { prisma } from "../lib/prisma"; import { getLogger } from "../lib/Logger"; +import { Pixel } from "@prisma/client"; -type Message = { - type: "canvasToRedis"; - width: number; - height: number; -}; +type Message = + | { type: "canvasSize"; width: number; height: number } + | { + type: "canvasToRedis"; + } + | { + type: "updateCanvasRedisAtPos"; + callbackId: number; + x: number; + y: number; + hex: string | "transparent"; + } + | { + type: "updateCanvasRedisWithBatch"; + callbackId: number; + batch: { x: number; y: number; hex: string }[]; + }; const Logger = getLogger("CANVAS_WORK"); @@ -19,16 +32,50 @@ redis.connect().then(() => { Logger.info("Connected to Redis"); }); +const queuedCanvasRedis: { + id: number; + pixels: { x: number; y: number; hex: string | "transparent" }[]; +}[] = []; + +let canvasSize = { width: -1, height: -1 }; + parentPort?.on("message", (msg: Message) => { switch (msg.type) { + case "canvasSize": + canvasSize = { width: msg.width, height: msg.height }; + Logger.info("Received canvasSize " + JSON.stringify(canvasSize)); + break; case "canvasToRedis": - canvasToRedis(msg.width, msg.height).then((str) => { + if (canvasSize.width === -1 || canvasSize.height === -1) { + Logger.error("Received canvasToRedis but i do not have the dimentions"); + return; + } + + canvasToRedis(canvasSize.width, canvasSize.height).then((str) => { parentPort?.postMessage({ type: "canvasToRedis", data: str }); }); break; + case "updateCanvasRedisAtPos": + queuedCanvasRedis.push({ + id: msg.callbackId, + pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }], + }); + startCanvasRedisIfNeeded(); + break; + case "updateCanvasRedisWithBatch": + queuedCanvasRedis.push({ + id: msg.callbackId, + pixels: msg.batch, + }); + startCanvasRedisIfNeeded(); + break; } }); +const execCallback = (id: number) => { + parentPort?.postMessage({ type: "callback", callbackId: id }); +}; + /** * Convert database pixels to Redis cache * @@ -76,3 +123,35 @@ const canvasToRedis = async (width: number, height: number) => { ); return pixels; }; + +let isCanvasRedisWorking = false; + +const startCanvasRedisIfNeeded = () => { + if (isCanvasRedisWorking) return; + + tickCanvasRedis(); +}; + +const tickCanvasRedis = async () => { + isCanvasRedisWorking = true; + + const item = queuedCanvasRedis.shift(); + if (!item) { + isCanvasRedisWorking = false; + return; + } + + const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split( + "," + ); + + for (const pixel of item.pixels) { + pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex; + } + + await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 }); + + execCallback(item.id); + + await tickCanvasRedis(); +}; diff --git a/packages/server/src/workers/worker.ts b/packages/server/src/workers/worker.ts index 24bc5b2..c50dd7c 100644 --- a/packages/server/src/workers/worker.ts +++ b/packages/server/src/workers/worker.ts @@ -40,6 +40,34 @@ const AllWorkers = { export const CanvasWorker = AllWorkers.canvas; +export const callWorkerMethod = (worker: Worker, type: string, data: any) => { + return new Promise((res) => { + const callbackId = Math.floor(Math.random() * 99999); + Logger.info(`Calling worker method ${type} ${callbackId}`); + + const handleMessage = (message: { + type: "callback"; + callbackId: number; + }) => { + if (message.type !== "callback") return; + if (message.callbackId !== callbackId) return; + + Logger.info(`Finished worker call ${type} ${callbackId}`); + res(); + + worker.off("message", handleMessage); + }; + + worker.on("message", handleMessage); + + worker.postMessage({ + ...data, + type, + callbackId, + }); + }); +}; + for (const [name, worker] of Object.entries(AllWorkers)) { worker.on("online", () => { Logger.info(`${name} worker is now online`);