Move canvasToRedis to a worker thread

Fixes #68
This commit is contained in:
Grant 2024-07-09 21:01:28 -06:00
parent 7c2dfbbbf8
commit a7a9b35da6
5 changed files with 150 additions and 42 deletions

View File

@ -6,6 +6,7 @@ import { ExpressServer } from "./lib/Express";
import { SocketServer } from "./lib/SocketServer";
import { OpenID } from "./lib/oidc";
import { loadSettings } from "./lib/Settings";
import "./workers/worker";
const Logger = getLogger("MAIN");

View File

@ -4,6 +4,7 @@ import { Redis } from "./redis";
import { SocketServer } from "./SocketServer";
import { getLogger } from "./Logger";
import { Pixel } from "@prisma/client";
import { CanvasWorker } from "../workers/worker";
const Logger = getLogger("CANVAS");
@ -162,50 +163,29 @@ class Canvas {
}
/**
* Database pixels -> single Redis comma separated list of hex
* @returns 1D array of pixel values
* Converts database pixels to Redis string
*
* @worker
* @returns
*/
async canvasToRedis() {
const now = Date.now();
Logger.info("Starting canvasToRedis...");
const redis = await Redis.getClient();
canvasToRedis(): Promise<string[]> {
return new Promise((res) => {
Logger.info("Triggering canvasToRedis()");
const [width, height] = this.getCanvasConfig().size;
const dbpixels = await prisma.pixel.findMany({
where: {
x: {
gte: 0,
lt: this.getCanvasConfig().size[0],
},
y: {
gte: 0,
lt: this.getCanvasConfig().size[1],
},
isTop: true,
},
CanvasWorker.once("message", (msg) => {
if (msg.type === "canvasToRedis") {
Logger.info("Finished canvasToRedis()");
res(msg.data);
}
});
CanvasWorker.postMessage({
type: "canvasToRedis",
width,
height,
});
});
const pixels: string[] = [];
// (y -> x) because of how the conversion needs to be done later
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
// fixes #24
for (let y = 0; y < this.canvasSize[1]; y++) {
for (let x = 0; x < this.canvasSize[0]; x++) {
pixels.push(
dbpixels.find((px) => px.x === x && px.y === y)?.color ||
"transparent"
);
}
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
Logger.info(
"Finished canvasToRedis in " +
((Date.now() - now) / 1000).toFixed(2) +
"s"
);
return pixels;
}
/**
@ -450,7 +430,7 @@ class Canvas {
await redis_set.setEx(Redis.key("heatmap"), 60 * 5, heatmapStr);
// notify anyone interested about the new heatmap
await redis_sub.publish(Redis.key("channel_heatmap"), heatmapStr);
await redis_set.publish(Redis.key("channel_heatmap"), heatmapStr);
// SocketServer.instance.io.to("sub:heatmap").emit("heatmap", heatmapStr);
return heatmapStr;

View File

@ -36,6 +36,8 @@ export const LoggerType = createEnum([
"REDIS",
"SOCKET",
"JOB_WORKER",
"CANVAS_WORK",
"WORKER_ROOT",
]);
export const getLogger = (module?: keyof typeof LoggerType) =>

View File

@ -0,0 +1,78 @@
import { parentPort } from "node:worker_threads";
import { Redis } from "../lib/redis";
import { prisma } from "../lib/prisma";
import { getLogger } from "../lib/Logger";
type Message = {
type: "canvasToRedis";
width: number;
height: number;
};
const Logger = getLogger("CANVAS_WORK");
/**
* We run the connection directly instead of via class functions to prevent side effects
*/
const redis = Redis.client;
redis.connect().then(() => {
Logger.info("Connected to Redis");
});
parentPort?.on("message", (msg: Message) => {
switch (msg.type) {
case "canvasToRedis":
canvasToRedis(msg.width, msg.height).then((str) => {
parentPort?.postMessage({ type: "canvasToRedis", data: str });
});
break;
}
});
/**
* Convert database pixels to Redis cache
*
* This does not depend on the Canvas class and can be ran inside the worker
*
* @param width
* @param height
* @returns
*/
const canvasToRedis = async (width: number, height: number) => {
const now = Date.now();
Logger.info("Starting canvasToRedis...");
const dbpixels = await prisma.pixel.findMany({
where: {
x: {
gte: 0,
lt: width,
},
y: {
gte: 0,
lt: height,
},
isTop: true,
},
});
const pixels: string[] = [];
// (y -> x) because of how the conversion needs to be done later
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
// fixes #24
for (let y = 0; y < height; y++) {
for (let x = 0; x < width; x++) {
pixels.push(
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
);
}
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
Logger.info(
"Finished canvasToRedis in " + ((Date.now() - now) / 1000).toFixed(2) + "s"
);
return pixels;
};

View File

@ -0,0 +1,47 @@
import { Worker, WorkerOptions } from "node:worker_threads";
import path from "node:path";
import { getLogger } from "../lib/Logger";
const Logger = getLogger("WORKER_ROOT");
export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
if (process.env.NODE_ENV === "production") file = file.replace(".ts", ".js");
// https://github.com/TypeStrong/ts-node/issues/676#issuecomment-531620154
wkOpts.eval = true;
if (!wkOpts.workerData) {
wkOpts.workerData = {};
}
wkOpts.workerData.__filename = path.join(__dirname, file);
return new Worker(
`
const wk = require('worker_threads');
require('ts-node').register();
let file = wk.workerData.__filename;
delete wk.workerData.__filename;
require(file);
`,
wkOpts
);
};
const AllWorkers = {
canvas: spawnWorker("canvas.ts"),
};
export const CanvasWorker = AllWorkers.canvas;
for (const [name, worker] of Object.entries(AllWorkers)) {
worker.on("online", () => {
Logger.info(`${name} worker is now online`);
});
worker.on("exit", (exitCode) => {
Logger.warn(`${name} worker has exited ${exitCode}`);
});
worker.on("error", (err) => {
Logger.warn(`${name} worker has errored ${err.message}`);
console.error(err);
});
}