Merge branch 'rewrite-canvas-caching' into 'main'

Rewrite canvas caching

Closes #86

See merge request sc07/canvas!12
This commit is contained in:
Grant 2024-07-13 21:02:18 +00:00
commit 2b26d25750
17 changed files with 590 additions and 294 deletions

20
package-lock.json generated
View File

@ -6592,6 +6592,12 @@
"resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz",
"integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw=="
},
"node_modules/@types/uuid": {
"version": "10.0.0",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz",
"integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==",
"dev": true
},
"node_modules/@typescript-eslint/eslint-plugin": {
"version": "5.62.0",
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.62.0.tgz",
@ -16480,6 +16486,7 @@
"rate-limit-redis": "^4.2.0",
"redis": "^4.6.12",
"socket.io": "^4.7.2",
"uuid": "^10.0.0",
"winston": "^3.11.0"
},
"devDependencies": {
@ -16487,6 +16494,7 @@
"@types/cors": "^2.8.17",
"@types/express": "^4.17.17",
"@types/express-session": "^1.17.7",
"@types/uuid": "^10.0.0",
"@typescript-eslint/eslint-plugin": "^7.1.0",
"@typescript-eslint/parser": "^7.1.0",
"dotenv": "^16.3.1",
@ -16783,6 +16791,18 @@
"engines": {
"node": ">= 0.8"
}
},
"packages/server/node_modules/uuid": {
"version": "10.0.0",
"resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz",
"integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==",
"funding": [
"https://github.com/sponsors/broofa",
"https://github.com/sponsors/ctavan"
],
"bin": {
"uuid": "dist/bin/uuid"
}
}
}
}

View File

@ -102,10 +102,25 @@ export class Canvas extends EventEmitter<CanvasEvents> {
if (Object.keys(this.pixels).length > 0)
Network.clearPreviousState("canvas");
Network.waitForState("canvas").then(([pixels]) => {
console.log("loadConfig just received new canvas data");
this.handleBatch(pixels);
// Network.waitForState("canvas").then(([pixels]) => {
// console.log("loadConfig just received new canvas data");
// this.handleBatch(pixels);
// });
Network.on("canvas", (start, end, pixels) => {
console.log("[Canvas] received canvas section");
this.handleBatch(start, end, pixels);
});
const chunks = Network.getCanvasChunks();
console.log(`[Canvas] Received ${chunks.length} chunks to load`);
let loaded = 0;
for (const chunk of chunks) {
console.log(`[Canvas] Loading canvas chunk ${loaded}...`);
this.handleBatch(chunk.start, chunk.end, chunk.pixels);
loaded++;
}
}
hasConfig() {
@ -267,27 +282,36 @@ export class Canvas extends EventEmitter<CanvasEvents> {
getRenderer().usePixels(serializeBuild);
};
handleBatch = (pixels: string[]) => {
handleBatch = (
start: [x: number, y: number],
end: [x: number, y: number],
pixels: string[]
) => {
if (!this.config.canvas) {
throw new Error("handleBatch called with no config");
}
let serializeBuild: CanvasPixel[] = [];
const width = end[0] - start[0];
const height = end[1] - start[1];
for (let x = 0; x < this.config.canvas.size[0]; x++) {
for (let y = 0; y < this.config.canvas.size[1]; y++) {
const hex = pixels[this.config.canvas.size[0] * y + x];
for (let x = 0; x < width; x++) {
for (let y = 0; y < height; y++) {
const hex = pixels[width * y + x];
const palette = this.Pallete.getColorFromHex(hex);
const canvasX = x + start[0];
const canvasY = y + start[1];
// we still store a copy of the pixels in this instance for non-rendering functions
this.pixels[x + "_" + y] = {
this.pixels[canvasX + "_" + canvasY] = {
type: "full",
color: palette?.id || -1,
};
serializeBuild.push({
x,
y,
x: canvasX,
y: canvasY,
hex: hex === "transparent" ? "null" : hex,
});
}

View File

@ -8,8 +8,6 @@ export type CanvasPixel = {
hex: string;
};
const bezier = (n: number) => n * n * (3 - 2 * n);
const isWorker = () => {
return (
// @ts-ignore
@ -41,8 +39,18 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
private blank?: RCanvas;
private blank_ctx?: RContext;
private pixels: CanvasPixel[] = [];
private allPixels: CanvasPixel[] = [];
/**
* Pixels that need to be drawn next draw call
*
* Key = x,y (eg 0,0)
*/
private pixels: Map<string, string> = new Map();
/**
* Every pixel
*
* Key = x,y (eg 0,0)
*/
private allPixels: Map<string, string> = new Map();
private isWorker = isWorker();
private _stopRender = false;
@ -87,37 +95,15 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
}
usePixels(pixels: CanvasPixel[], replace = false) {
if (replace) {
this.pixels = pixels;
this.allPixels = pixels;
} else {
for (const pixel of pixels) {
this.usePixel(pixel);
}
for (const pixel of pixels) {
this.usePixel(pixel);
}
}
usePixel(pixel: CanvasPixel) {
{
let existing = this.pixels.find(
(p) => p.x === pixel.x && p.y === pixel.y
);
if (existing) {
this.pixels.splice(this.pixels.indexOf(existing), 1);
}
}
{
let existing = this.allPixels.find(
(p) => p.x === pixel.x && p.y === pixel.y
);
if (existing) {
this.allPixels.splice(this.allPixels.indexOf(existing), 1);
}
}
this.pixels.push(pixel);
this.allPixels.push(pixel);
let key = pixel.x + "," + pixel.y;
this.pixels.set(key, pixel.hex);
this.allPixels.set(key, pixel.hex);
}
startRender() {
@ -179,16 +165,19 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
draw() {
const start = performance.now();
const pixels = [...this.pixels];
this.pixels = [];
const pixels = new Map(this.pixels);
this.pixels.clear();
if (pixels.length) {
console.log("[CanvasRenderer#draw] drawing " + pixels.length + " pixels");
if (pixels.size) {
console.log("[CanvasRenderer#draw] drawing " + pixels.size + " pixels");
}
for (const pixel of pixels) {
this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex;
this.ctx.fillRect(pixel.x, pixel.y, 1, 1);
for (const [x_y, hex] of pixels) {
const x = parseInt(x_y.split(",")[0]);
const y = parseInt(x_y.split(",")[1]);
this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex;
this.ctx.fillRect(x, y, 1, 1);
}
const diff = performance.now() - start;
@ -219,9 +208,12 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
this.ctx.fillStyle = "#fff";
this.ctx.fillRect(0, 0, this.canvas.width, this.canvas.height);
for (const pixel of this.allPixels) {
this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex;
this.ctx.fillRect(pixel.x, pixel.y, 1, 1);
for (const [x_y, hex] of this.allPixels) {
const x = parseInt(x_y.split(",")[0]);
const y = parseInt(x_y.split(",")[1]);
this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex;
this.ctx.fillRect(x, y, 1, 1);
}
}
@ -234,11 +226,13 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
ctx.clearRect(0, 0, canvas.width, canvas.height);
for (const pixel of this.allPixels) {
if (pixel.hex !== "null") continue;
for (const [x_y, hex] of this.allPixels) {
if (hex !== "null") continue;
const x = parseInt(x_y.split(",")[0]);
const y = parseInt(x_y.split(",")[1]);
ctx.fillStyle = "rgba(0,140,0,0.5)";
ctx.fillRect(pixel.x, pixel.y, 1, 1);
ctx.fillRect(x, y, 1, 1);
}
}
}

View File

@ -20,7 +20,11 @@ export interface INetworkEvents {
user: (user: AuthSession) => void;
standing: (standing: IAccountStanding) => void;
config: (user: ClientConfig) => void;
canvas: (pixels: string[]) => void;
canvas: (
start: [x: number, y: number],
end: [x: number, y: number],
pixels: string[]
) => void;
pixels: (data: { available: number }) => void;
pixelLastPlaced: (time: number) => void;
online: (count: number) => void;
@ -55,6 +59,12 @@ class Network extends EventEmitter<INetworkEvents> {
[key in keyof INetworkEvents]?: SentEventValue<key>;
} = {};
private canvasChunks: {
start: [number, number];
end: [number, number];
pixels: string[];
}[] = [];
constructor() {
super();
@ -123,8 +133,14 @@ class Network extends EventEmitter<INetworkEvents> {
this.emit("config", config);
});
this.socket.on("canvas", (pixels) => {
this.acceptState("canvas", pixels);
this.socket.on("canvas", (start, end, pixels) => {
// this.acceptState("canvas", start, end, pixels);
this.emit("canvas", start, end, pixels);
this.canvasChunks.push({ start, end, pixels });
});
this.socket.on("clearCanvasChunks", () => {
this.canvasChunks = [];
});
this.socket.on("availablePixels", (count) => {
@ -191,6 +207,10 @@ class Network extends EventEmitter<INetworkEvents> {
delete this.stateEvents[ev];
}
getCanvasChunks() {
return this.canvasChunks;
}
/**
* Wait for event, either being already sent, or new one
*

View File

@ -3,7 +3,12 @@
export type Subscription = "heatmap";
export interface ServerToClientEvents {
canvas: (pixels: string[]) => void;
canvas: (
start: [x: number, y: number],
end: [x: number, y: number],
pixels: string[]
) => void;
clearCanvasChunks: () => void;
user: (user: AuthSession) => void;
standing: (standing: IAccountStanding) => void;
config: (config: ClientConfig) => void;

View File

@ -20,6 +20,7 @@
"@types/cors": "^2.8.17",
"@types/express": "^4.17.17",
"@types/express-session": "^1.17.7",
"@types/uuid": "^10.0.0",
"@typescript-eslint/eslint-plugin": "^7.1.0",
"@typescript-eslint/parser": "^7.1.0",
"dotenv": "^16.3.1",
@ -44,6 +45,7 @@
"rate-limit-redis": "^4.2.0",
"redis": "^4.6.12",
"socket.io": "^4.7.2",
"uuid": "^10.0.0",
"winston": "^3.11.0"
}
}

View File

@ -6,6 +6,8 @@ import { getLogger } from "../lib/Logger";
import Canvas from "../lib/Canvas";
import { RateLimiter } from "../lib/RateLimiter";
import { Instance } from "../models/Instance";
import { callCacheWorker } from "../workers/worker";
import { Redis } from "../lib/redis";
const Logger = getLogger("HTTP/CLIENT");

View File

@ -7,6 +7,7 @@ import { SocketServer } from "./lib/SocketServer";
import { OpenID } from "./lib/oidc";
import { loadSettings } from "./lib/Settings";
import "./workers/worker";
import { spawnCacheWorkers } from "./workers/worker";
const Logger = getLogger("MAIN");
@ -85,12 +86,17 @@ if (!process.env.PIXEL_LOG_PATH) {
Logger.warn("PIXEL_LOG_PATH is not defined, defaulting to packages/server");
}
if (!process.env.CACHE_WORKERS) {
Logger.warn("CACHE_WORKERS is not defined, defaulting to 1 worker");
}
// run startup tasks, all of these need to be completed to serve
Promise.all([
Redis.getClient(),
OpenID.setup().then(() => {
Logger.info("Setup OpenID");
}),
spawnCacheWorkers(),
loadSettings(),
]).then(() => {
Logger.info("Startup tasks have completed, starting server");

View File

@ -1,14 +1,26 @@
import { CanvasConfig } from "@sc07-canvas/lib/src/net";
import {
CanvasConfig,
ClientToServerEvents,
ServerToClientEvents,
} from "@sc07-canvas/lib/src/net";
import { prisma } from "./prisma";
import { Redis } from "./redis";
import { SocketServer } from "./SocketServer";
import { getLogger } from "./Logger";
import { Pixel } from "@prisma/client";
import { CanvasWorker, callWorkerMethod } from "../workers/worker";
import {
callCacheWorker,
callWorkerMethod,
getCacheWorkerIdForCoords,
getCanvasCacheWorker,
} from "../workers/worker";
import { LogMan } from "./LogMan";
import { Socket } from "socket.io";
const Logger = getLogger("CANVAS");
const canvasSectionSize = [100, 100]; // TODO: config maybe?
class Canvas {
/**
* Size of the canvas
@ -68,14 +80,10 @@ class Canvas {
/**
* Change size of the canvas
*
* Expensive task, will take a bit
*
* @param width
* @param height
*/
async setSize(width: number, height: number, useStatic = false) {
CanvasWorker.postMessage({ type: "canvasSize", width, height });
if (useStatic) {
this.canvasSize = [width, height];
return;
@ -100,7 +108,7 @@ class Canvas {
},
});
// the redis key is 1D, since the dimentions changed we need to update it
// update cached canvas chunks
await this.canvasToRedis();
// this gets called on startup, before the SocketServer is initialized
@ -109,10 +117,9 @@ class Canvas {
// announce the new config, which contains the canvas size
SocketServer.instance.broadcastConfig();
// announce new pixel array that was generated previously
await this.getPixelsArray().then((pixels) => {
SocketServer.instance?.io.emit("canvas", pixels);
});
// announce all canvas chunks
SocketServer.instance.io.emit("clearCanvasChunks");
await this.broadcastCanvasChunks();
} else {
Logger.warn(
"[Canvas#setSize] No SocketServer instance, cannot broadcast config change"
@ -126,6 +133,54 @@ class Canvas {
);
}
async sendCanvasChunksToSocket(
socket: Socket<ClientToServerEvents, ServerToClientEvents>
) {
await this.getAllChunks((start, end, data) => {
socket.emit("canvas", start, end, data.split(","));
});
}
async broadcastCanvasChunks() {
await this.getAllChunks((start, end, data) => {
SocketServer.instance.io.emit("canvas", start, end, data.split(","));
});
}
async getAllChunks(
chunkCallback: (
start: [x: number, y: number],
end: [x: number, y: number],
data: string
) => any
) {
const redis = await Redis.getClient();
let pending: Promise<void>[] = [];
for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) {
for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) {
const start: [number, number] = [x, y];
const end: [number, number] = [
x + canvasSectionSize[0],
y + canvasSectionSize[1],
];
pending.push(
new Promise((res) => {
redis
.get(Redis.key("canvas_section", start, end))
.then((value): any => {
chunkCallback(start, end, value || "");
res();
});
})
);
}
}
await Promise.allSettled(pending);
}
async forceUpdatePixelIsTop() {
const now = Date.now();
Logger.info("[Canvas#forceUpdatePixelIsTop] is starting...");
@ -210,67 +265,65 @@ class Canvas {
}
/**
* Converts database pixels to Redis string
* Chunks canvas pixels and caches chunks in redis
*
* @worker
* @returns
*/
canvasToRedis(): Promise<string[]> {
return new Promise((res) => {
Logger.info("Triggering canvasToRedis()");
const [width, height] = this.getCanvasConfig().size;
async canvasToRedis(): Promise<void> {
const start = Date.now();
CanvasWorker.once("message", (msg) => {
if (msg.type === "canvasToRedis") {
Logger.info("Finished canvasToRedis()");
res(msg.data);
}
});
let pending: Promise<any>[] = [];
CanvasWorker.postMessage({
type: "canvasToRedis",
width,
height,
});
});
for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) {
for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) {
pending.push(
callCacheWorker("cache", {
start: [x, y],
end: [x + canvasSectionSize[0], y + canvasSectionSize[1]],
})
);
}
}
await Promise.allSettled(pending);
Logger.info(
`Finished canvasToRedis() in ${((Date.now() - start) / 1000).toFixed(2)}s`
);
}
/**
* force an update at a specific position
*/
async updateCanvasRedisAtPos(x: number, y: number) {
const redis = await Redis.getClient();
const dbpixel = await this.getPixel(x, y);
await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", {
x,
y,
hex: dbpixel?.color || "transparent",
});
// ensure pixels in the same location are always in the same queue
const workerId = getCacheWorkerIdForCoords(x, y);
// queue canvas writes in redis to avoid memory issues in badly written queue code
redis.lPush(
Redis.key("canvas_cache_write_queue", workerId),
x + "," + y + "," + (dbpixel?.color || "transparent")
);
}
async updateCanvasRedisWithBatch(
pixelBatch: { x: number; y: number; hex: string }[]
) {
await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", {
batch: pixelBatch,
});
}
async isPixelArrayCached() {
const redis = await Redis.getClient();
return await redis.exists(Redis.key("canvas"));
}
for (const pixel of pixelBatch) {
// ensure pixels in the same location are always in the same queue
const workerId = getCacheWorkerIdForCoords(pixel.x, pixel.y);
async getPixelsArray() {
const redis = await Redis.getClient();
if (await redis.exists(Redis.key("canvas"))) {
const cached = await redis.get(Redis.key("canvas"));
return cached!.split(",");
// queue canvas writes in redis
redis.lPush(
Redis.key("canvas_cache_write_queue", workerId),
[pixel.x, pixel.y, pixel.hex].join(",")
);
}
return await this.canvasToRedis();
}
/**
@ -468,7 +521,6 @@ class Canvas {
*/
async generateHeatmap() {
const redis_set = await Redis.getClient("MAIN");
const redis_sub = await Redis.getClient("SUB");
const now = Date.now();
const minimumDate = new Date();

View File

@ -12,12 +12,17 @@ const formatter = format.printf((options) => {
maxModuleWidth = Math.max(maxModuleWidth, `[${module}]`.length);
}
let moduleName = options.moduleName;
if (typeof options.workerId !== "undefined") {
moduleName += " #" + options.workerId;
}
let modulePadding = " ".repeat(
Math.max(0, maxModuleWidth - `[${options.moduleName}]`.length)
Math.max(0, maxModuleWidth - `[${moduleName}]`.length)
);
let parts: string[] = [
options.timestamp + ` [${options.moduleName || "---"}]` + modulePadding,
options.timestamp + ` [${moduleName || "---"}]` + modulePadding,
options.level + ":",
options.message,
];
@ -58,5 +63,7 @@ export const LoggerType = createEnum([
"RECAPTCHA",
]);
export const getLogger = (module?: keyof typeof LoggerType) =>
Winston.child({ moduleName: module });
export const getLogger = (
module?: keyof typeof LoggerType,
workerId?: number
) => Winston.child({ moduleName: module, workerId });

View File

@ -4,6 +4,7 @@ import e from "express";
import { SocketServer } from "./SocketServer";
import Canvas from "./Canvas";
import { Redis } from "./redis";
import { CACHE_WORKERS, getCacheWorkerQueueLength } from "../workers/worker";
client.collectDefaultMetrics({
labels: process.env.NODE_APP_INSTANCE
@ -87,6 +88,34 @@ export const TotalPixels = new client.Gauge({
},
});
const CacheWorkerQueueMain = new client.Gauge({
name: "cache_worker_callback_queue_main",
help: "cache worker callback queue length for main process",
collect() {
this.set(getCacheWorkerQueueLength());
},
});
const CacheWorkerQueueWorkers = new client.Gauge({
name: "cache_worker_queue_workers",
help: "cache worker write queue length per worker process",
labelNames: ["worker_id"],
async collect() {
const redis = await Redis.getClient();
for (let i = 0; i < CACHE_WORKERS; i++) {
this.set(
{
worker_id: i,
},
await redis.lLen(Redis.key("canvas_cache_write_queue", i))
);
}
},
});
export const handleMetricsEndpoint = async (
req: e.Request,
res: e.Response

View File

@ -165,6 +165,7 @@ export class SocketServer {
user?.sockets.add(socket);
Logger.debug("handleConnection " + user?.sockets.size);
socket.emit("clearCanvasChunks");
Redis.getClient().then((redis) => {
if (user) redis.set(Redis.key("socketToSub", socket.id), user.sub);
@ -197,29 +198,12 @@ export class SocketServer {
socket.emit("config", getClientConfig());
{
let _clientNotifiedAboutCache = false;
Canvas.isPixelArrayCached().then((cached) => {
if (!cached) {
_clientNotifiedAboutCache = true;
socket.emit("alert", {
id: "canvas_cache_pending",
is: "toast",
action: "system",
severity: "info",
title: "Canvas loading",
body: "Canvas not cached, this may take a couple seconds",
autoDismiss: true,
});
}
});
Canvas.getPixelsArray().then((pixels) => {
socket.emit("canvas", pixels);
socket.emit("alert_dismiss", "canvas_cache_pending");
Canvas.sendCanvasChunksToSocket(socket).then(() => {
socket.emit("alert", {
is: "toast",
action: "system",
severity: "success",
title: "Canvas loaded!",
title: "Canvas loaded",
autoDismiss: true,
});
});

View File

@ -9,8 +9,13 @@ const Logger = getLogger("REDIS");
*/
interface IRedisKeys {
// canvas
canvas(): string;
// canvas(): string;
heatmap(): string;
canvas_section(
start: [x: number, y: number],
end: [x: number, y: number]
): string;
canvas_cache_write_queue(workerId: number): string;
// users
socketToSub(socketId: string): string;
@ -23,8 +28,11 @@ interface IRedisKeys {
* Defined as a variable due to boottime augmentation
*/
const RedisKeys: IRedisKeys = {
canvas: () => `CANVAS:PIXELS`,
// canvas: () => `CANVAS:PIXELS`,
heatmap: () => `CANVAS:HEATMAP`,
canvas_section: (start, end) =>
`CANVAS:PIXELS:${start.join(",")}:${end.join(",")}`,
canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`,
socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`,
channel_heatmap: () => `CANVAS:HEATMAP`,
};

View File

@ -62,12 +62,14 @@ declare global {
MATRIX_GENERAL_ALIAS: string;
PIXEL_LOG_PATH?: string;
RECAPTCHA_SITE_KEY?: string;
RECAPTCHA_SECRET_KEY?: string;
RECAPTCHA_PIXEL_CHANCE?: string;
DISCORD_WEBHOOK?: string;
CACHE_WORKERS?: string;
}
}
}

View File

@ -1,157 +0,0 @@
import { parentPort } from "node:worker_threads";
import { Redis } from "../lib/redis";
import { prisma } from "../lib/prisma";
import { getLogger } from "../lib/Logger";
import { Pixel } from "@prisma/client";
type Message =
| { type: "canvasSize"; width: number; height: number }
| {
type: "canvasToRedis";
}
| {
type: "updateCanvasRedisAtPos";
callbackId: number;
x: number;
y: number;
hex: string | "transparent";
}
| {
type: "updateCanvasRedisWithBatch";
callbackId: number;
batch: { x: number; y: number; hex: string }[];
};
const Logger = getLogger("CANVAS_WORK");
/**
* We run the connection directly instead of via class functions to prevent side effects
*/
const redis = Redis.client;
redis.connect().then(() => {
Logger.info("Connected to Redis");
});
const queuedCanvasRedis: {
id: number;
pixels: { x: number; y: number; hex: string | "transparent" }[];
}[] = [];
let canvasSize = { width: -1, height: -1 };
parentPort?.on("message", (msg: Message) => {
switch (msg.type) {
case "canvasSize":
canvasSize = { width: msg.width, height: msg.height };
Logger.info("Received canvasSize " + JSON.stringify(canvasSize));
break;
case "canvasToRedis":
if (canvasSize.width === -1 || canvasSize.height === -1) {
Logger.error("Received canvasToRedis but i do not have the dimentions");
return;
}
canvasToRedis(canvasSize.width, canvasSize.height).then((str) => {
parentPort?.postMessage({ type: "canvasToRedis", data: str });
});
break;
case "updateCanvasRedisAtPos":
queuedCanvasRedis.push({
id: msg.callbackId,
pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }],
});
startCanvasRedisIfNeeded();
break;
case "updateCanvasRedisWithBatch":
queuedCanvasRedis.push({
id: msg.callbackId,
pixels: msg.batch,
});
startCanvasRedisIfNeeded();
break;
}
});
const execCallback = (id: number) => {
parentPort?.postMessage({ type: "callback", callbackId: id });
};
/**
* Convert database pixels to Redis cache
*
* This does not depend on the Canvas class and can be ran inside the worker
*
* @param width
* @param height
* @returns
*/
const canvasToRedis = async (width: number, height: number) => {
const now = Date.now();
Logger.info("Starting canvasToRedis...");
const dbpixels = await prisma.pixel.findMany({
where: {
x: {
gte: 0,
lt: width,
},
y: {
gte: 0,
lt: height,
},
isTop: true,
},
});
const pixels: string[] = [];
// (y -> x) because of how the conversion needs to be done later
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
// fixes #24
for (let y = 0; y < height; y++) {
for (let x = 0; x < width; x++) {
pixels.push(
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
);
}
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
Logger.info(
"Finished canvasToRedis in " + ((Date.now() - now) / 1000).toFixed(2) + "s"
);
return pixels;
};
let isCanvasRedisWorking = false;
const startCanvasRedisIfNeeded = () => {
if (isCanvasRedisWorking) return;
tickCanvasRedis();
};
const tickCanvasRedis = async () => {
isCanvasRedisWorking = true;
const item = queuedCanvasRedis.shift();
if (!item) {
isCanvasRedisWorking = false;
return;
}
const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split(
","
);
for (const pixel of item.pixels) {
pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex;
}
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
execCallback(item.id);
await tickCanvasRedis();
};

View File

@ -0,0 +1,164 @@
/**
* Cache the contents of the database into redis keys
*
* Each cache chunk should aim be 100x100 pixels
*/
import { parentPort } from "node:worker_threads";
import { getLogger } from "../lib/Logger";
import { Redis } from "../lib/redis";
import { prisma } from "../lib/prisma";
// TODO: config maybe?
// <!> this value is hardcoded in #getCanvasSectionFromCoords
const canvasSectionSize = [100, 100];
type Message =
| { type: "id"; workerId: number }
| {
type: "cache";
start: [x: number, y: number];
end: [x: number, y: number];
callbackId: string;
}
| {
type: "write_pixel";
};
let Logger = getLogger("CANVAS_WORK");
/**
* We run the connection directly instead of via class functions to prevent side effects
*/
const redis = Redis.client;
redis.connect().then(() => {
Logger.info("Connected to Redis");
});
let workerId: number;
parentPort?.on("message", (msg: Message) => {
switch (msg.type) {
case "id":
workerId = msg.workerId;
Logger = getLogger("CANVAS_WORK", workerId);
Logger.info("Received worker ID assignment: " + workerId);
startWriteQueue().then(() => {});
break;
case "cache":
doCache(msg.start, msg.end).then(() => {
parentPort?.postMessage({
type: "callback",
callbackId: msg.callbackId,
});
});
break;
}
});
/**
* Get canvas section from coordinates
*
* @note This is hardcoded to expect the section size to be 100x100 pixels
*
* @param x
* @param y
*/
const getCanvasSectionFromCoords = (
x: number,
y: number
): { start: [x: number, y: number]; end: [x: number, y: number] } => {
// since we are assuming the section size is 100x100
// we can get the start position based on the hundreds position
const baseX = Math.floor((x % 1000) / 100); // get the hundreds
const baseY = Math.floor((y % 1000) / 100); // get the hundreds
return {
start: [baseX * 100, baseY * 100],
end: [baseX * 100 + 100, baseY * 100 + 100],
};
};
const startWriteQueue = async () => {
const item = await redis.lPop(
Redis.key("canvas_cache_write_queue", workerId)
);
if (!item) {
setTimeout(() => {
startWriteQueue();
}, 250);
return;
}
const x = parseInt(item.split(",")[0]);
const y = parseInt(item.split(",")[1]);
const color = item.split(",")[2];
const section = getCanvasSectionFromCoords(x, y);
const pixels: string[] = (
(await redis.get(
Redis.key("canvas_section", section.start, section.end)
)) || ""
).split(",");
const arrX = x - section.start[0];
const arrY = y - section.start[1];
pixels[canvasSectionSize[0] * arrY + arrX] = color;
await redis.set(
Redis.key("canvas_section", section.start, section.end),
pixels.join(",")
);
startWriteQueue();
};
const doCache = async (
start: [x: number, y: number],
end: [x: number, y: number]
) => {
const now = Date.now();
Logger.info(
"starting cache of section " + start.join(",") + " -> " + end.join(",")
);
const dbpixels = await prisma.pixel.findMany({
where: {
x: {
gte: start[0],
lt: end[0],
},
y: {
gte: start[1],
lt: end[1],
},
isTop: true,
},
});
const pixels: string[] = [];
// (y -> x) because of how the conversion needs to be done later
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
// fixes #24
for (let y = start[1]; y < end[1]; y++) {
for (let x = start[0]; x < end[0]; x++) {
pixels.push(
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
);
}
}
await redis.set(Redis.key("canvas_section", start, end), pixels.join(","));
Logger.info(
"finished cache of section " +
start.join(",") +
" -> " +
end.join(",") +
" in " +
((Date.now() - now) / 1000).toFixed(2) +
"s"
);
};

View File

@ -1,8 +1,12 @@
import { Worker, WorkerOptions } from "node:worker_threads";
import path from "node:path";
import { v4 as uuid } from "uuid";
import { getLogger } from "../lib/Logger";
const Logger = getLogger("WORKER_ROOT");
export const CACHE_WORKERS = process.env.CACHE_WORKERS
? parseInt(process.env.CACHE_WORKERS) || 1
: 1;
export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
if (process.env.NODE_ENV === "production") {
@ -34,11 +38,141 @@ export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
}
};
const AllWorkers = {
canvas: spawnWorker("canvas.ts"),
// not used as of right now
// dedicated worker threads for specific tasks would go here
const AllWorkers: { [k: string]: Worker } = {};
let cacheWorkers: Worker[] = [];
/**
* Return consistent worker ID for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerIdForCoords = (x: number, y: number): number => {
const key = (x + y) % cacheWorkers.length;
return key;
};
export const CanvasWorker = AllWorkers.canvas;
/**
* Return consistent worker for the specified coordinates
* @param x
* @param y
* @returns
*/
export const getCacheWorkerForCoords = (x: number, y: number): Worker => {
return cacheWorkers[getCacheWorkerIdForCoords(x, y)];
};
/**
* Spawns cache workers
*
* Promise resolves when all of them are alive
*
* @param num
*/
export const spawnCacheWorkers = async (num?: number): Promise<void> => {
if (typeof num === "undefined") {
// if the function isn't told, use the environment variables
num = CACHE_WORKERS;
}
Logger.info(`Spawning ${num} cache workers...`);
let pending: Promise<any>[] = [];
for (let i = 0; i < num; i++) {
const worker = spawnWorker("canvas_cache");
pending.push(
new Promise((res) => {
worker.on("online", () => {
Logger.info(`Canvas cache worker #${i} is now online`);
worker.postMessage({ type: "id", workerId: i });
res(undefined);
});
})
);
worker.on("error", (err) => {
Logger.error(`Canvas cache worker #${i} has errored`);
console.error(err);
});
worker.on("exit", (exit) => {
Logger.warn(`Canvas cache worker #${i} has exited ${exit}`);
const index = cacheWorkers.indexOf(worker);
if (index > -1) {
cacheWorkers.splice(index, 1);
Logger.info(`Removed dead worker #${i} from pool`);
}
});
setupWorkerCallback(worker);
cacheWorkers.push(worker);
}
await Promise.allSettled(pending);
Logger.info(`Successfully spawned ${num} cache workers`);
};
export const getCanvasCacheWorker = () => {
return cacheWorkers[Math.floor(Math.random() * cacheWorkers.length)];
};
let cacheWorkerQueue: { [k: string]: () => any } = {};
/**
* Prometheus metrics
* @returns
*/
export const getCacheWorkerQueueLength = () =>
Object.keys(cacheWorkerQueue).length;
const setupWorkerCallback = (worker: Worker) => {
worker.on("message", (message: { type: "callback"; callbackId: number }) => {
if (message.type !== "callback") return;
const callback = cacheWorkerQueue[message.callbackId];
if (!callback) {
Logger.warn(
"Received callback message from worker, but no callbacks are waiting " +
message.callbackId
);
return;
}
callback();
delete cacheWorkerQueue[message.callbackId];
});
};
export const callCacheWorker = (type: string, data: any) => {
return new Promise<void>((res) => {
const callbackId = uuid();
cacheWorkerQueue[callbackId] = () => {
res();
clearTimeout(watchdog);
};
let watchdog = setTimeout(() => {
Logger.error(
`Callback for ${type} ${callbackId} has taken too long, is it dead?`
);
}, 10000);
const worker = getCanvasCacheWorker();
worker.postMessage({
...data,
type,
callbackId,
});
});
};
export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
return new Promise<void>((res) => {