Queue & run canvas redis cache in worker

This commit is contained in:
Grant 2024-07-13 00:53:24 +00:00
parent 85dd5d76b8
commit d2990845ad
3 changed files with 124 additions and 27 deletions

View File

@ -4,7 +4,7 @@ import { Redis } from "./redis";
import { SocketServer } from "./SocketServer";
import { getLogger } from "./Logger";
import { Pixel } from "@prisma/client";
import { CanvasWorker } from "../workers/worker";
import { CanvasWorker, callWorkerMethod } from "../workers/worker";
import { LogMan } from "./LogMan";
const Logger = getLogger("CANVAS");
@ -74,6 +74,8 @@ class Canvas {
* @param height
*/
async setSize(width: number, height: number, useStatic = false) {
CanvasWorker.postMessage({ type: "canvasSize", width, height });
if (useStatic) {
this.canvasSize = [width, height];
return;
@ -237,33 +239,21 @@ class Canvas {
* force an update at a specific position
*/
async updateCanvasRedisAtPos(x: number, y: number) {
const redis = await Redis.getClient();
const pixels: string[] = (
(await redis.get(Redis.key("canvas"))) || ""
).split(",");
const dbpixel = await this.getPixel(x, y);
pixels[this.canvasSize[0] * y + x] = dbpixel?.color || "transparent";
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", {
x,
y,
hex: dbpixel?.color || "transparent",
});
}
async updateCanvasRedisWithBatch(
pixelBatch: { x: number; y: number; hex: string }[]
) {
const redis = await Redis.getClient();
const pixels: string[] = (
(await redis.get(Redis.key("canvas"))) || ""
).split(",");
for (const pixel of pixelBatch) {
pixels[this.canvasSize[0] * pixel.y + pixel.x] = pixel.hex;
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", {
batch: pixelBatch,
});
}
async isPixelArrayCached() {

View File

@ -2,12 +2,25 @@ import { parentPort } from "node:worker_threads";
import { Redis } from "../lib/redis";
import { prisma } from "../lib/prisma";
import { getLogger } from "../lib/Logger";
import { Pixel } from "@prisma/client";
type Message = {
type Message =
| { type: "canvasSize"; width: number; height: number }
| {
type: "canvasToRedis";
width: number;
height: number;
};
}
| {
type: "updateCanvasRedisAtPos";
callbackId: number;
x: number;
y: number;
hex: string | "transparent";
}
| {
type: "updateCanvasRedisWithBatch";
callbackId: number;
batch: { x: number; y: number; hex: string }[];
};
const Logger = getLogger("CANVAS_WORK");
@ -19,16 +32,50 @@ redis.connect().then(() => {
Logger.info("Connected to Redis");
});
const queuedCanvasRedis: {
id: number;
pixels: { x: number; y: number; hex: string | "transparent" }[];
}[] = [];
let canvasSize = { width: -1, height: -1 };
parentPort?.on("message", (msg: Message) => {
switch (msg.type) {
case "canvasSize":
canvasSize = { width: msg.width, height: msg.height };
Logger.info("Received canvasSize " + JSON.stringify(canvasSize));
break;
case "canvasToRedis":
canvasToRedis(msg.width, msg.height).then((str) => {
if (canvasSize.width === -1 || canvasSize.height === -1) {
Logger.error("Received canvasToRedis but i do not have the dimentions");
return;
}
canvasToRedis(canvasSize.width, canvasSize.height).then((str) => {
parentPort?.postMessage({ type: "canvasToRedis", data: str });
});
break;
case "updateCanvasRedisAtPos":
queuedCanvasRedis.push({
id: msg.callbackId,
pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }],
});
startCanvasRedisIfNeeded();
break;
case "updateCanvasRedisWithBatch":
queuedCanvasRedis.push({
id: msg.callbackId,
pixels: msg.batch,
});
startCanvasRedisIfNeeded();
break;
}
});
const execCallback = (id: number) => {
parentPort?.postMessage({ type: "callback", callbackId: id });
};
/**
* Convert database pixels to Redis cache
*
@ -76,3 +123,35 @@ const canvasToRedis = async (width: number, height: number) => {
);
return pixels;
};
let isCanvasRedisWorking = false;
const startCanvasRedisIfNeeded = () => {
if (isCanvasRedisWorking) return;
tickCanvasRedis();
};
const tickCanvasRedis = async () => {
isCanvasRedisWorking = true;
const item = queuedCanvasRedis.shift();
if (!item) {
isCanvasRedisWorking = false;
return;
}
const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split(
","
);
for (const pixel of item.pixels) {
pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex;
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
execCallback(item.id);
await tickCanvasRedis();
};

View File

@ -40,6 +40,34 @@ const AllWorkers = {
export const CanvasWorker = AllWorkers.canvas;
export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
return new Promise<void>((res) => {
const callbackId = Math.floor(Math.random() * 99999);
Logger.info(`Calling worker method ${type} ${callbackId}`);
const handleMessage = (message: {
type: "callback";
callbackId: number;
}) => {
if (message.type !== "callback") return;
if (message.callbackId !== callbackId) return;
Logger.info(`Finished worker call ${type} ${callbackId}`);
res();
worker.off("message", handleMessage);
};
worker.on("message", handleMessage);
worker.postMessage({
...data,
type,
callbackId,
});
});
};
for (const [name, worker] of Object.entries(AllWorkers)) {
worker.on("online", () => {
Logger.info(`${name} worker is now online`);