Rewrite canvas caching
This commit is contained in:
parent
582bfa568e
commit
b99a0cb7ed
|
@ -6592,6 +6592,12 @@
|
|||
"resolved": "https://registry.npmjs.org/@types/triple-beam/-/triple-beam-1.3.5.tgz",
|
||||
"integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw=="
|
||||
},
|
||||
"node_modules/@types/uuid": {
|
||||
"version": "10.0.0",
|
||||
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-10.0.0.tgz",
|
||||
"integrity": "sha512-7gqG38EyHgyP1S+7+xomFtL+ZNHcKv6DwNaCZmJmo1vgMugyF3TCnXVg4t1uk89mLNwnLtnY3TpOpCOyp1/xHQ==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@typescript-eslint/eslint-plugin": {
|
||||
"version": "5.62.0",
|
||||
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-5.62.0.tgz",
|
||||
|
@ -16480,6 +16486,7 @@
|
|||
"rate-limit-redis": "^4.2.0",
|
||||
"redis": "^4.6.12",
|
||||
"socket.io": "^4.7.2",
|
||||
"uuid": "^10.0.0",
|
||||
"winston": "^3.11.0"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -16487,6 +16494,7 @@
|
|||
"@types/cors": "^2.8.17",
|
||||
"@types/express": "^4.17.17",
|
||||
"@types/express-session": "^1.17.7",
|
||||
"@types/uuid": "^10.0.0",
|
||||
"@typescript-eslint/eslint-plugin": "^7.1.0",
|
||||
"@typescript-eslint/parser": "^7.1.0",
|
||||
"dotenv": "^16.3.1",
|
||||
|
@ -16783,6 +16791,18 @@
|
|||
"engines": {
|
||||
"node": ">= 0.8"
|
||||
}
|
||||
},
|
||||
"packages/server/node_modules/uuid": {
|
||||
"version": "10.0.0",
|
||||
"resolved": "https://registry.npmjs.org/uuid/-/uuid-10.0.0.tgz",
|
||||
"integrity": "sha512-8XkAphELsDnEGrDxUOHB3RGvXz6TeuYSGEZBOjtTtPm2lwhGBjLgOzLHB63IUWfBpNucQjND6d3AOudO+H3RWQ==",
|
||||
"funding": [
|
||||
"https://github.com/sponsors/broofa",
|
||||
"https://github.com/sponsors/ctavan"
|
||||
],
|
||||
"bin": {
|
||||
"uuid": "dist/bin/uuid"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -102,10 +102,25 @@ export class Canvas extends EventEmitter<CanvasEvents> {
|
|||
if (Object.keys(this.pixels).length > 0)
|
||||
Network.clearPreviousState("canvas");
|
||||
|
||||
Network.waitForState("canvas").then(([pixels]) => {
|
||||
console.log("loadConfig just received new canvas data");
|
||||
this.handleBatch(pixels);
|
||||
// Network.waitForState("canvas").then(([pixels]) => {
|
||||
// console.log("loadConfig just received new canvas data");
|
||||
// this.handleBatch(pixels);
|
||||
// });
|
||||
|
||||
Network.on("canvas", (start, end, pixels) => {
|
||||
console.log("[Canvas] received canvas section");
|
||||
this.handleBatch(start, end, pixels);
|
||||
});
|
||||
|
||||
const chunks = Network.getCanvasChunks();
|
||||
console.log(`[Canvas] Received ${chunks.length} chunks to load`);
|
||||
let loaded = 0;
|
||||
for (const chunk of chunks) {
|
||||
console.log(`[Canvas] Loading canvas chunk ${loaded}...`);
|
||||
this.handleBatch(chunk.start, chunk.end, chunk.pixels);
|
||||
|
||||
loaded++;
|
||||
}
|
||||
}
|
||||
|
||||
hasConfig() {
|
||||
|
@ -267,27 +282,36 @@ export class Canvas extends EventEmitter<CanvasEvents> {
|
|||
getRenderer().usePixels(serializeBuild);
|
||||
};
|
||||
|
||||
handleBatch = (pixels: string[]) => {
|
||||
handleBatch = (
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number],
|
||||
pixels: string[]
|
||||
) => {
|
||||
if (!this.config.canvas) {
|
||||
throw new Error("handleBatch called with no config");
|
||||
}
|
||||
|
||||
let serializeBuild: CanvasPixel[] = [];
|
||||
const width = end[0] - start[0];
|
||||
const height = end[1] - start[1];
|
||||
|
||||
for (let x = 0; x < this.config.canvas.size[0]; x++) {
|
||||
for (let y = 0; y < this.config.canvas.size[1]; y++) {
|
||||
const hex = pixels[this.config.canvas.size[0] * y + x];
|
||||
for (let x = 0; x < width; x++) {
|
||||
for (let y = 0; y < height; y++) {
|
||||
const hex = pixels[width * y + x];
|
||||
const palette = this.Pallete.getColorFromHex(hex);
|
||||
|
||||
const canvasX = x + start[0];
|
||||
const canvasY = y + start[1];
|
||||
|
||||
// we still store a copy of the pixels in this instance for non-rendering functions
|
||||
this.pixels[x + "_" + y] = {
|
||||
this.pixels[canvasX + "_" + canvasY] = {
|
||||
type: "full",
|
||||
color: palette?.id || -1,
|
||||
};
|
||||
|
||||
serializeBuild.push({
|
||||
x,
|
||||
y,
|
||||
x: canvasX,
|
||||
y: canvasY,
|
||||
hex: hex === "transparent" ? "null" : hex,
|
||||
});
|
||||
}
|
||||
|
|
|
@ -8,8 +8,6 @@ export type CanvasPixel = {
|
|||
hex: string;
|
||||
};
|
||||
|
||||
const bezier = (n: number) => n * n * (3 - 2 * n);
|
||||
|
||||
const isWorker = () => {
|
||||
return (
|
||||
// @ts-ignore
|
||||
|
@ -41,8 +39,18 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
|
|||
private blank?: RCanvas;
|
||||
private blank_ctx?: RContext;
|
||||
|
||||
private pixels: CanvasPixel[] = [];
|
||||
private allPixels: CanvasPixel[] = [];
|
||||
/**
|
||||
* Pixels that need to be drawn next draw call
|
||||
*
|
||||
* Key = x,y (eg 0,0)
|
||||
*/
|
||||
private pixels: Map<string, string> = new Map();
|
||||
/**
|
||||
* Every pixel
|
||||
*
|
||||
* Key = x,y (eg 0,0)
|
||||
*/
|
||||
private allPixels: Map<string, string> = new Map();
|
||||
private isWorker = isWorker();
|
||||
|
||||
private _stopRender = false;
|
||||
|
@ -87,37 +95,15 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
|
|||
}
|
||||
|
||||
usePixels(pixels: CanvasPixel[], replace = false) {
|
||||
if (replace) {
|
||||
this.pixels = pixels;
|
||||
this.allPixels = pixels;
|
||||
} else {
|
||||
for (const pixel of pixels) {
|
||||
this.usePixel(pixel);
|
||||
}
|
||||
for (const pixel of pixels) {
|
||||
this.usePixel(pixel);
|
||||
}
|
||||
}
|
||||
|
||||
usePixel(pixel: CanvasPixel) {
|
||||
{
|
||||
let existing = this.pixels.find(
|
||||
(p) => p.x === pixel.x && p.y === pixel.y
|
||||
);
|
||||
if (existing) {
|
||||
this.pixels.splice(this.pixels.indexOf(existing), 1);
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
let existing = this.allPixels.find(
|
||||
(p) => p.x === pixel.x && p.y === pixel.y
|
||||
);
|
||||
if (existing) {
|
||||
this.allPixels.splice(this.allPixels.indexOf(existing), 1);
|
||||
}
|
||||
}
|
||||
|
||||
this.pixels.push(pixel);
|
||||
this.allPixels.push(pixel);
|
||||
let key = pixel.x + "," + pixel.y;
|
||||
this.pixels.set(key, pixel.hex);
|
||||
this.allPixels.set(key, pixel.hex);
|
||||
}
|
||||
|
||||
startRender() {
|
||||
|
@ -179,16 +165,19 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
|
|||
draw() {
|
||||
const start = performance.now();
|
||||
|
||||
const pixels = [...this.pixels];
|
||||
this.pixels = [];
|
||||
const pixels = new Map(this.pixels);
|
||||
this.pixels.clear();
|
||||
|
||||
if (pixels.length) {
|
||||
console.log("[CanvasRenderer#draw] drawing " + pixels.length + " pixels");
|
||||
if (pixels.size) {
|
||||
console.log("[CanvasRenderer#draw] drawing " + pixels.size + " pixels");
|
||||
}
|
||||
|
||||
for (const pixel of pixels) {
|
||||
this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex;
|
||||
this.ctx.fillRect(pixel.x, pixel.y, 1, 1);
|
||||
for (const [x_y, hex] of pixels) {
|
||||
const x = parseInt(x_y.split(",")[0]);
|
||||
const y = parseInt(x_y.split(",")[1]);
|
||||
|
||||
this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex;
|
||||
this.ctx.fillRect(x, y, 1, 1);
|
||||
}
|
||||
|
||||
const diff = performance.now() - start;
|
||||
|
@ -219,9 +208,12 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
|
|||
this.ctx.fillStyle = "#fff";
|
||||
this.ctx.fillRect(0, 0, this.canvas.width, this.canvas.height);
|
||||
|
||||
for (const pixel of this.allPixels) {
|
||||
this.ctx.fillStyle = pixel.hex === "null" ? "#fff" : "#" + pixel.hex;
|
||||
this.ctx.fillRect(pixel.x, pixel.y, 1, 1);
|
||||
for (const [x_y, hex] of this.allPixels) {
|
||||
const x = parseInt(x_y.split(",")[0]);
|
||||
const y = parseInt(x_y.split(",")[1]);
|
||||
|
||||
this.ctx.fillStyle = hex === "null" ? "#fff" : "#" + hex;
|
||||
this.ctx.fillRect(x, y, 1, 1);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,11 +226,13 @@ export class CanvasRenderer extends EventEmitter<RendererEvents> {
|
|||
|
||||
ctx.clearRect(0, 0, canvas.width, canvas.height);
|
||||
|
||||
for (const pixel of this.allPixels) {
|
||||
if (pixel.hex !== "null") continue;
|
||||
for (const [x_y, hex] of this.allPixels) {
|
||||
if (hex !== "null") continue;
|
||||
const x = parseInt(x_y.split(",")[0]);
|
||||
const y = parseInt(x_y.split(",")[1]);
|
||||
|
||||
ctx.fillStyle = "rgba(0,140,0,0.5)";
|
||||
ctx.fillRect(pixel.x, pixel.y, 1, 1);
|
||||
ctx.fillRect(x, y, 1, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,7 +20,11 @@ export interface INetworkEvents {
|
|||
user: (user: AuthSession) => void;
|
||||
standing: (standing: IAccountStanding) => void;
|
||||
config: (user: ClientConfig) => void;
|
||||
canvas: (pixels: string[]) => void;
|
||||
canvas: (
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number],
|
||||
pixels: string[]
|
||||
) => void;
|
||||
pixels: (data: { available: number }) => void;
|
||||
pixelLastPlaced: (time: number) => void;
|
||||
online: (count: number) => void;
|
||||
|
@ -55,6 +59,12 @@ class Network extends EventEmitter<INetworkEvents> {
|
|||
[key in keyof INetworkEvents]?: SentEventValue<key>;
|
||||
} = {};
|
||||
|
||||
private canvasChunks: {
|
||||
start: [number, number];
|
||||
end: [number, number];
|
||||
pixels: string[];
|
||||
}[] = [];
|
||||
|
||||
constructor() {
|
||||
super();
|
||||
|
||||
|
@ -123,8 +133,14 @@ class Network extends EventEmitter<INetworkEvents> {
|
|||
this.emit("config", config);
|
||||
});
|
||||
|
||||
this.socket.on("canvas", (pixels) => {
|
||||
this.acceptState("canvas", pixels);
|
||||
this.socket.on("canvas", (start, end, pixels) => {
|
||||
// this.acceptState("canvas", start, end, pixels);
|
||||
this.emit("canvas", start, end, pixels);
|
||||
this.canvasChunks.push({ start, end, pixels });
|
||||
});
|
||||
|
||||
this.socket.on("clearCanvasChunks", () => {
|
||||
this.canvasChunks = [];
|
||||
});
|
||||
|
||||
this.socket.on("availablePixels", (count) => {
|
||||
|
@ -191,6 +207,10 @@ class Network extends EventEmitter<INetworkEvents> {
|
|||
delete this.stateEvents[ev];
|
||||
}
|
||||
|
||||
getCanvasChunks() {
|
||||
return this.canvasChunks;
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for event, either being already sent, or new one
|
||||
*
|
||||
|
|
|
@ -3,7 +3,12 @@
|
|||
export type Subscription = "heatmap";
|
||||
|
||||
export interface ServerToClientEvents {
|
||||
canvas: (pixels: string[]) => void;
|
||||
canvas: (
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number],
|
||||
pixels: string[]
|
||||
) => void;
|
||||
clearCanvasChunks: () => void;
|
||||
user: (user: AuthSession) => void;
|
||||
standing: (standing: IAccountStanding) => void;
|
||||
config: (config: ClientConfig) => void;
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
"@types/cors": "^2.8.17",
|
||||
"@types/express": "^4.17.17",
|
||||
"@types/express-session": "^1.17.7",
|
||||
"@types/uuid": "^10.0.0",
|
||||
"@typescript-eslint/eslint-plugin": "^7.1.0",
|
||||
"@typescript-eslint/parser": "^7.1.0",
|
||||
"dotenv": "^16.3.1",
|
||||
|
@ -44,6 +45,7 @@
|
|||
"rate-limit-redis": "^4.2.0",
|
||||
"redis": "^4.6.12",
|
||||
"socket.io": "^4.7.2",
|
||||
"uuid": "^10.0.0",
|
||||
"winston": "^3.11.0"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import { getLogger } from "../lib/Logger";
|
|||
import Canvas from "../lib/Canvas";
|
||||
import { RateLimiter } from "../lib/RateLimiter";
|
||||
import { Instance } from "../models/Instance";
|
||||
import { callCacheWorker } from "../workers/worker";
|
||||
import { Redis } from "../lib/redis";
|
||||
|
||||
const Logger = getLogger("HTTP/CLIENT");
|
||||
|
||||
|
|
|
@ -7,6 +7,7 @@ import { SocketServer } from "./lib/SocketServer";
|
|||
import { OpenID } from "./lib/oidc";
|
||||
import { loadSettings } from "./lib/Settings";
|
||||
import "./workers/worker";
|
||||
import { spawnCacheWorkers } from "./workers/worker";
|
||||
|
||||
const Logger = getLogger("MAIN");
|
||||
|
||||
|
@ -85,12 +86,17 @@ if (!process.env.PIXEL_LOG_PATH) {
|
|||
Logger.warn("PIXEL_LOG_PATH is not defined, defaulting to packages/server");
|
||||
}
|
||||
|
||||
if (!process.env.CACHE_WORKERS) {
|
||||
Logger.warn("CACHE_WORKERS is not defined, defaulting to 1 worker");
|
||||
}
|
||||
|
||||
// run startup tasks, all of these need to be completed to serve
|
||||
Promise.all([
|
||||
Redis.getClient(),
|
||||
OpenID.setup().then(() => {
|
||||
Logger.info("Setup OpenID");
|
||||
}),
|
||||
spawnCacheWorkers(),
|
||||
loadSettings(),
|
||||
]).then(() => {
|
||||
Logger.info("Startup tasks have completed, starting server");
|
||||
|
|
|
@ -1,14 +1,26 @@
|
|||
import { CanvasConfig } from "@sc07-canvas/lib/src/net";
|
||||
import {
|
||||
CanvasConfig,
|
||||
ClientToServerEvents,
|
||||
ServerToClientEvents,
|
||||
} from "@sc07-canvas/lib/src/net";
|
||||
import { prisma } from "./prisma";
|
||||
import { Redis } from "./redis";
|
||||
import { SocketServer } from "./SocketServer";
|
||||
import { getLogger } from "./Logger";
|
||||
import { Pixel } from "@prisma/client";
|
||||
import { CanvasWorker, callWorkerMethod } from "../workers/worker";
|
||||
import {
|
||||
callCacheWorker,
|
||||
callWorkerMethod,
|
||||
getCacheWorkerIdForCoords,
|
||||
getCanvasCacheWorker,
|
||||
} from "../workers/worker";
|
||||
import { LogMan } from "./LogMan";
|
||||
import { Socket } from "socket.io";
|
||||
|
||||
const Logger = getLogger("CANVAS");
|
||||
|
||||
const canvasSectionSize = [100, 100]; // TODO: config maybe?
|
||||
|
||||
class Canvas {
|
||||
/**
|
||||
* Size of the canvas
|
||||
|
@ -68,14 +80,10 @@ class Canvas {
|
|||
/**
|
||||
* Change size of the canvas
|
||||
*
|
||||
* Expensive task, will take a bit
|
||||
*
|
||||
* @param width
|
||||
* @param height
|
||||
*/
|
||||
async setSize(width: number, height: number, useStatic = false) {
|
||||
CanvasWorker.postMessage({ type: "canvasSize", width, height });
|
||||
|
||||
if (useStatic) {
|
||||
this.canvasSize = [width, height];
|
||||
return;
|
||||
|
@ -100,7 +108,7 @@ class Canvas {
|
|||
},
|
||||
});
|
||||
|
||||
// the redis key is 1D, since the dimentions changed we need to update it
|
||||
// update cached canvas chunks
|
||||
await this.canvasToRedis();
|
||||
|
||||
// this gets called on startup, before the SocketServer is initialized
|
||||
|
@ -109,10 +117,9 @@ class Canvas {
|
|||
// announce the new config, which contains the canvas size
|
||||
SocketServer.instance.broadcastConfig();
|
||||
|
||||
// announce new pixel array that was generated previously
|
||||
await this.getPixelsArray().then((pixels) => {
|
||||
SocketServer.instance?.io.emit("canvas", pixels);
|
||||
});
|
||||
// announce all canvas chunks
|
||||
SocketServer.instance.io.emit("clearCanvasChunks");
|
||||
await this.broadcastCanvasChunks();
|
||||
} else {
|
||||
Logger.warn(
|
||||
"[Canvas#setSize] No SocketServer instance, cannot broadcast config change"
|
||||
|
@ -126,6 +133,54 @@ class Canvas {
|
|||
);
|
||||
}
|
||||
|
||||
async sendCanvasChunksToSocket(
|
||||
socket: Socket<ClientToServerEvents, ServerToClientEvents>
|
||||
) {
|
||||
await this.getAllChunks((start, end, data) => {
|
||||
socket.emit("canvas", start, end, data.split(","));
|
||||
});
|
||||
}
|
||||
|
||||
async broadcastCanvasChunks() {
|
||||
await this.getAllChunks((start, end, data) => {
|
||||
SocketServer.instance.io.emit("canvas", start, end, data.split(","));
|
||||
});
|
||||
}
|
||||
|
||||
async getAllChunks(
|
||||
chunkCallback: (
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number],
|
||||
data: string
|
||||
) => any
|
||||
) {
|
||||
const redis = await Redis.getClient();
|
||||
let pending: Promise<void>[] = [];
|
||||
|
||||
for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) {
|
||||
for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) {
|
||||
const start: [number, number] = [x, y];
|
||||
const end: [number, number] = [
|
||||
x + canvasSectionSize[0],
|
||||
y + canvasSectionSize[1],
|
||||
];
|
||||
|
||||
pending.push(
|
||||
new Promise((res) => {
|
||||
redis
|
||||
.get(Redis.key("canvas_section", start, end))
|
||||
.then((value): any => {
|
||||
chunkCallback(start, end, value || "");
|
||||
res();
|
||||
});
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.allSettled(pending);
|
||||
}
|
||||
|
||||
async forceUpdatePixelIsTop() {
|
||||
const now = Date.now();
|
||||
Logger.info("[Canvas#forceUpdatePixelIsTop] is starting...");
|
||||
|
@ -210,67 +265,65 @@ class Canvas {
|
|||
}
|
||||
|
||||
/**
|
||||
* Converts database pixels to Redis string
|
||||
* Chunks canvas pixels and caches chunks in redis
|
||||
*
|
||||
* @worker
|
||||
* @returns
|
||||
*/
|
||||
canvasToRedis(): Promise<string[]> {
|
||||
return new Promise((res) => {
|
||||
Logger.info("Triggering canvasToRedis()");
|
||||
const [width, height] = this.getCanvasConfig().size;
|
||||
async canvasToRedis(): Promise<void> {
|
||||
const start = Date.now();
|
||||
|
||||
CanvasWorker.once("message", (msg) => {
|
||||
if (msg.type === "canvasToRedis") {
|
||||
Logger.info("Finished canvasToRedis()");
|
||||
res(msg.data);
|
||||
}
|
||||
});
|
||||
let pending: Promise<any>[] = [];
|
||||
|
||||
CanvasWorker.postMessage({
|
||||
type: "canvasToRedis",
|
||||
width,
|
||||
height,
|
||||
});
|
||||
});
|
||||
for (let x = 0; x < this.canvasSize[0]; x += canvasSectionSize[0]) {
|
||||
for (let y = 0; y < this.canvasSize[1]; y += canvasSectionSize[1]) {
|
||||
pending.push(
|
||||
callCacheWorker("cache", {
|
||||
start: [x, y],
|
||||
end: [x + canvasSectionSize[0], y + canvasSectionSize[1]],
|
||||
})
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await Promise.allSettled(pending);
|
||||
Logger.info(
|
||||
`Finished canvasToRedis() in ${((Date.now() - start) / 1000).toFixed(2)}s`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* force an update at a specific position
|
||||
*/
|
||||
async updateCanvasRedisAtPos(x: number, y: number) {
|
||||
const redis = await Redis.getClient();
|
||||
const dbpixel = await this.getPixel(x, y);
|
||||
|
||||
await callWorkerMethod(CanvasWorker, "updateCanvasRedisAtPos", {
|
||||
x,
|
||||
y,
|
||||
hex: dbpixel?.color || "transparent",
|
||||
});
|
||||
// ensure pixels in the same location are always in the same queue
|
||||
const workerId = getCacheWorkerIdForCoords(x, y);
|
||||
|
||||
// queue canvas writes in redis to avoid memory issues in badly written queue code
|
||||
redis.lPush(
|
||||
Redis.key("canvas_cache_write_queue", workerId),
|
||||
x + "," + y + "," + (dbpixel?.color || "transparent")
|
||||
);
|
||||
}
|
||||
|
||||
async updateCanvasRedisWithBatch(
|
||||
pixelBatch: { x: number; y: number; hex: string }[]
|
||||
) {
|
||||
await callWorkerMethod(CanvasWorker, "updateCanvasRedisWithBatch", {
|
||||
batch: pixelBatch,
|
||||
});
|
||||
}
|
||||
|
||||
async isPixelArrayCached() {
|
||||
const redis = await Redis.getClient();
|
||||
|
||||
return await redis.exists(Redis.key("canvas"));
|
||||
}
|
||||
for (const pixel of pixelBatch) {
|
||||
// ensure pixels in the same location are always in the same queue
|
||||
const workerId = getCacheWorkerIdForCoords(pixel.x, pixel.y);
|
||||
|
||||
async getPixelsArray() {
|
||||
const redis = await Redis.getClient();
|
||||
|
||||
if (await redis.exists(Redis.key("canvas"))) {
|
||||
const cached = await redis.get(Redis.key("canvas"));
|
||||
return cached!.split(",");
|
||||
// queue canvas writes in redis
|
||||
redis.lPush(
|
||||
Redis.key("canvas_cache_write_queue", workerId),
|
||||
[pixel.x, pixel.y, pixel.hex].join(",")
|
||||
);
|
||||
}
|
||||
|
||||
return await this.canvasToRedis();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -468,7 +521,6 @@ class Canvas {
|
|||
*/
|
||||
async generateHeatmap() {
|
||||
const redis_set = await Redis.getClient("MAIN");
|
||||
const redis_sub = await Redis.getClient("SUB");
|
||||
|
||||
const now = Date.now();
|
||||
const minimumDate = new Date();
|
||||
|
|
|
@ -12,12 +12,17 @@ const formatter = format.printf((options) => {
|
|||
maxModuleWidth = Math.max(maxModuleWidth, `[${module}]`.length);
|
||||
}
|
||||
|
||||
let moduleName = options.moduleName;
|
||||
if (typeof options.workerId !== "undefined") {
|
||||
moduleName += " #" + options.workerId;
|
||||
}
|
||||
|
||||
let modulePadding = " ".repeat(
|
||||
Math.max(0, maxModuleWidth - `[${options.moduleName}]`.length)
|
||||
Math.max(0, maxModuleWidth - `[${moduleName}]`.length)
|
||||
);
|
||||
|
||||
let parts: string[] = [
|
||||
options.timestamp + ` [${options.moduleName || "---"}]` + modulePadding,
|
||||
options.timestamp + ` [${moduleName || "---"}]` + modulePadding,
|
||||
options.level + ":",
|
||||
options.message,
|
||||
];
|
||||
|
@ -58,5 +63,7 @@ export const LoggerType = createEnum([
|
|||
"RECAPTCHA",
|
||||
]);
|
||||
|
||||
export const getLogger = (module?: keyof typeof LoggerType) =>
|
||||
Winston.child({ moduleName: module });
|
||||
export const getLogger = (
|
||||
module?: keyof typeof LoggerType,
|
||||
workerId?: number
|
||||
) => Winston.child({ moduleName: module, workerId });
|
||||
|
|
|
@ -4,6 +4,7 @@ import e from "express";
|
|||
import { SocketServer } from "./SocketServer";
|
||||
import Canvas from "./Canvas";
|
||||
import { Redis } from "./redis";
|
||||
import { CACHE_WORKERS, getCacheWorkerQueueLength } from "../workers/worker";
|
||||
|
||||
client.collectDefaultMetrics({
|
||||
labels: process.env.NODE_APP_INSTANCE
|
||||
|
@ -87,6 +88,34 @@ export const TotalPixels = new client.Gauge({
|
|||
},
|
||||
});
|
||||
|
||||
const CacheWorkerQueueMain = new client.Gauge({
|
||||
name: "cache_worker_callback_queue_main",
|
||||
help: "cache worker callback queue length for main process",
|
||||
|
||||
collect() {
|
||||
this.set(getCacheWorkerQueueLength());
|
||||
},
|
||||
});
|
||||
|
||||
const CacheWorkerQueueWorkers = new client.Gauge({
|
||||
name: "cache_worker_queue_workers",
|
||||
help: "cache worker write queue length per worker process",
|
||||
labelNames: ["worker_id"],
|
||||
|
||||
async collect() {
|
||||
const redis = await Redis.getClient();
|
||||
|
||||
for (let i = 0; i < CACHE_WORKERS; i++) {
|
||||
this.set(
|
||||
{
|
||||
worker_id: i,
|
||||
},
|
||||
await redis.lLen(Redis.key("canvas_cache_write_queue", i))
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
|
||||
export const handleMetricsEndpoint = async (
|
||||
req: e.Request,
|
||||
res: e.Response
|
||||
|
|
|
@ -165,6 +165,7 @@ export class SocketServer {
|
|||
|
||||
user?.sockets.add(socket);
|
||||
Logger.debug("handleConnection " + user?.sockets.size);
|
||||
socket.emit("clearCanvasChunks");
|
||||
|
||||
Redis.getClient().then((redis) => {
|
||||
if (user) redis.set(Redis.key("socketToSub", socket.id), user.sub);
|
||||
|
@ -197,29 +198,12 @@ export class SocketServer {
|
|||
|
||||
socket.emit("config", getClientConfig());
|
||||
{
|
||||
let _clientNotifiedAboutCache = false;
|
||||
Canvas.isPixelArrayCached().then((cached) => {
|
||||
if (!cached) {
|
||||
_clientNotifiedAboutCache = true;
|
||||
socket.emit("alert", {
|
||||
id: "canvas_cache_pending",
|
||||
is: "toast",
|
||||
action: "system",
|
||||
severity: "info",
|
||||
title: "Canvas loading",
|
||||
body: "Canvas not cached, this may take a couple seconds",
|
||||
autoDismiss: true,
|
||||
});
|
||||
}
|
||||
});
|
||||
Canvas.getPixelsArray().then((pixels) => {
|
||||
socket.emit("canvas", pixels);
|
||||
socket.emit("alert_dismiss", "canvas_cache_pending");
|
||||
Canvas.sendCanvasChunksToSocket(socket).then(() => {
|
||||
socket.emit("alert", {
|
||||
is: "toast",
|
||||
action: "system",
|
||||
severity: "success",
|
||||
title: "Canvas loaded!",
|
||||
title: "Canvas loaded",
|
||||
autoDismiss: true,
|
||||
});
|
||||
});
|
||||
|
|
|
@ -9,8 +9,13 @@ const Logger = getLogger("REDIS");
|
|||
*/
|
||||
interface IRedisKeys {
|
||||
// canvas
|
||||
canvas(): string;
|
||||
// canvas(): string;
|
||||
heatmap(): string;
|
||||
canvas_section(
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number]
|
||||
): string;
|
||||
canvas_cache_write_queue(workerId: number): string;
|
||||
|
||||
// users
|
||||
socketToSub(socketId: string): string;
|
||||
|
@ -23,8 +28,11 @@ interface IRedisKeys {
|
|||
* Defined as a variable due to boottime augmentation
|
||||
*/
|
||||
const RedisKeys: IRedisKeys = {
|
||||
canvas: () => `CANVAS:PIXELS`,
|
||||
// canvas: () => `CANVAS:PIXELS`,
|
||||
heatmap: () => `CANVAS:HEATMAP`,
|
||||
canvas_section: (start, end) =>
|
||||
`CANVAS:PIXELS:${start.join(",")}:${end.join(",")}`,
|
||||
canvas_cache_write_queue: (workerId) => `CANVAS:CACHE_QUEUE:${workerId}`,
|
||||
socketToSub: (socketId: string) => `CANVAS:SOCKET:${socketId}`,
|
||||
channel_heatmap: () => `CANVAS:HEATMAP`,
|
||||
};
|
||||
|
|
|
@ -62,12 +62,14 @@ declare global {
|
|||
MATRIX_GENERAL_ALIAS: string;
|
||||
|
||||
PIXEL_LOG_PATH?: string;
|
||||
|
||||
|
||||
RECAPTCHA_SITE_KEY?: string;
|
||||
RECAPTCHA_SECRET_KEY?: string;
|
||||
RECAPTCHA_PIXEL_CHANCE?: string;
|
||||
|
||||
DISCORD_WEBHOOK?: string;
|
||||
|
||||
CACHE_WORKERS?: string;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,157 +0,0 @@
|
|||
import { parentPort } from "node:worker_threads";
|
||||
import { Redis } from "../lib/redis";
|
||||
import { prisma } from "../lib/prisma";
|
||||
import { getLogger } from "../lib/Logger";
|
||||
import { Pixel } from "@prisma/client";
|
||||
|
||||
type Message =
|
||||
| { type: "canvasSize"; width: number; height: number }
|
||||
| {
|
||||
type: "canvasToRedis";
|
||||
}
|
||||
| {
|
||||
type: "updateCanvasRedisAtPos";
|
||||
callbackId: number;
|
||||
x: number;
|
||||
y: number;
|
||||
hex: string | "transparent";
|
||||
}
|
||||
| {
|
||||
type: "updateCanvasRedisWithBatch";
|
||||
callbackId: number;
|
||||
batch: { x: number; y: number; hex: string }[];
|
||||
};
|
||||
|
||||
const Logger = getLogger("CANVAS_WORK");
|
||||
|
||||
/**
|
||||
* We run the connection directly instead of via class functions to prevent side effects
|
||||
*/
|
||||
const redis = Redis.client;
|
||||
redis.connect().then(() => {
|
||||
Logger.info("Connected to Redis");
|
||||
});
|
||||
|
||||
const queuedCanvasRedis: {
|
||||
id: number;
|
||||
pixels: { x: number; y: number; hex: string | "transparent" }[];
|
||||
}[] = [];
|
||||
|
||||
let canvasSize = { width: -1, height: -1 };
|
||||
|
||||
parentPort?.on("message", (msg: Message) => {
|
||||
switch (msg.type) {
|
||||
case "canvasSize":
|
||||
canvasSize = { width: msg.width, height: msg.height };
|
||||
Logger.info("Received canvasSize " + JSON.stringify(canvasSize));
|
||||
break;
|
||||
case "canvasToRedis":
|
||||
if (canvasSize.width === -1 || canvasSize.height === -1) {
|
||||
Logger.error("Received canvasToRedis but i do not have the dimentions");
|
||||
return;
|
||||
}
|
||||
|
||||
canvasToRedis(canvasSize.width, canvasSize.height).then((str) => {
|
||||
parentPort?.postMessage({ type: "canvasToRedis", data: str });
|
||||
});
|
||||
break;
|
||||
case "updateCanvasRedisAtPos":
|
||||
queuedCanvasRedis.push({
|
||||
id: msg.callbackId,
|
||||
pixels: [{ x: msg.x, y: msg.y, hex: msg.hex }],
|
||||
});
|
||||
startCanvasRedisIfNeeded();
|
||||
break;
|
||||
case "updateCanvasRedisWithBatch":
|
||||
queuedCanvasRedis.push({
|
||||
id: msg.callbackId,
|
||||
pixels: msg.batch,
|
||||
});
|
||||
startCanvasRedisIfNeeded();
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
const execCallback = (id: number) => {
|
||||
parentPort?.postMessage({ type: "callback", callbackId: id });
|
||||
};
|
||||
|
||||
/**
|
||||
* Convert database pixels to Redis cache
|
||||
*
|
||||
* This does not depend on the Canvas class and can be ran inside the worker
|
||||
*
|
||||
* @param width
|
||||
* @param height
|
||||
* @returns
|
||||
*/
|
||||
const canvasToRedis = async (width: number, height: number) => {
|
||||
const now = Date.now();
|
||||
Logger.info("Starting canvasToRedis...");
|
||||
|
||||
const dbpixels = await prisma.pixel.findMany({
|
||||
where: {
|
||||
x: {
|
||||
gte: 0,
|
||||
lt: width,
|
||||
},
|
||||
y: {
|
||||
gte: 0,
|
||||
lt: height,
|
||||
},
|
||||
isTop: true,
|
||||
},
|
||||
});
|
||||
|
||||
const pixels: string[] = [];
|
||||
|
||||
// (y -> x) because of how the conversion needs to be done later
|
||||
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
|
||||
// fixes #24
|
||||
for (let y = 0; y < height; y++) {
|
||||
for (let x = 0; x < width; x++) {
|
||||
pixels.push(
|
||||
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
|
||||
|
||||
Logger.info(
|
||||
"Finished canvasToRedis in " + ((Date.now() - now) / 1000).toFixed(2) + "s"
|
||||
);
|
||||
return pixels;
|
||||
};
|
||||
|
||||
let isCanvasRedisWorking = false;
|
||||
|
||||
const startCanvasRedisIfNeeded = () => {
|
||||
if (isCanvasRedisWorking) return;
|
||||
|
||||
tickCanvasRedis();
|
||||
};
|
||||
|
||||
const tickCanvasRedis = async () => {
|
||||
isCanvasRedisWorking = true;
|
||||
|
||||
const item = queuedCanvasRedis.shift();
|
||||
if (!item) {
|
||||
isCanvasRedisWorking = false;
|
||||
return;
|
||||
}
|
||||
|
||||
const pixels: string[] = ((await redis.get(Redis.key("canvas"))) || "").split(
|
||||
","
|
||||
);
|
||||
|
||||
for (const pixel of item.pixels) {
|
||||
pixels[canvasSize.width * pixel.y + pixel.x] = pixel.hex;
|
||||
}
|
||||
|
||||
await redis.set(Redis.key("canvas"), pixels.join(","), { EX: 60 * 5 });
|
||||
|
||||
execCallback(item.id);
|
||||
|
||||
await tickCanvasRedis();
|
||||
};
|
|
@ -0,0 +1,164 @@
|
|||
/**
|
||||
* Cache the contents of the database into redis keys
|
||||
*
|
||||
* Each cache chunk should aim be 100x100 pixels
|
||||
*/
|
||||
|
||||
import { parentPort } from "node:worker_threads";
|
||||
import { getLogger } from "../lib/Logger";
|
||||
import { Redis } from "../lib/redis";
|
||||
import { prisma } from "../lib/prisma";
|
||||
|
||||
// TODO: config maybe?
|
||||
// <!> this value is hardcoded in #getCanvasSectionFromCoords
|
||||
const canvasSectionSize = [100, 100];
|
||||
|
||||
type Message =
|
||||
| { type: "id"; workerId: number }
|
||||
| {
|
||||
type: "cache";
|
||||
start: [x: number, y: number];
|
||||
end: [x: number, y: number];
|
||||
callbackId: string;
|
||||
}
|
||||
| {
|
||||
type: "write_pixel";
|
||||
};
|
||||
|
||||
let Logger = getLogger("CANVAS_WORK");
|
||||
|
||||
/**
|
||||
* We run the connection directly instead of via class functions to prevent side effects
|
||||
*/
|
||||
const redis = Redis.client;
|
||||
redis.connect().then(() => {
|
||||
Logger.info("Connected to Redis");
|
||||
});
|
||||
|
||||
let workerId: number;
|
||||
|
||||
parentPort?.on("message", (msg: Message) => {
|
||||
switch (msg.type) {
|
||||
case "id":
|
||||
workerId = msg.workerId;
|
||||
Logger = getLogger("CANVAS_WORK", workerId);
|
||||
Logger.info("Received worker ID assignment: " + workerId);
|
||||
startWriteQueue().then(() => {});
|
||||
break;
|
||||
case "cache":
|
||||
doCache(msg.start, msg.end).then(() => {
|
||||
parentPort?.postMessage({
|
||||
type: "callback",
|
||||
callbackId: msg.callbackId,
|
||||
});
|
||||
});
|
||||
break;
|
||||
}
|
||||
});
|
||||
|
||||
/**
|
||||
* Get canvas section from coordinates
|
||||
*
|
||||
* @note This is hardcoded to expect the section size to be 100x100 pixels
|
||||
*
|
||||
* @param x
|
||||
* @param y
|
||||
*/
|
||||
const getCanvasSectionFromCoords = (
|
||||
x: number,
|
||||
y: number
|
||||
): { start: [x: number, y: number]; end: [x: number, y: number] } => {
|
||||
// since we are assuming the section size is 100x100
|
||||
// we can get the start position based on the hundreds position
|
||||
const baseX = Math.floor((x % 1000) / 100); // get the hundreds
|
||||
const baseY = Math.floor((y % 1000) / 100); // get the hundreds
|
||||
|
||||
return {
|
||||
start: [baseX * 100, baseY * 100],
|
||||
end: [baseX * 100 + 100, baseY * 100 + 100],
|
||||
};
|
||||
};
|
||||
|
||||
const startWriteQueue = async () => {
|
||||
const item = await redis.lPop(
|
||||
Redis.key("canvas_cache_write_queue", workerId)
|
||||
);
|
||||
if (!item) {
|
||||
setTimeout(() => {
|
||||
startWriteQueue();
|
||||
}, 250);
|
||||
return;
|
||||
}
|
||||
|
||||
const x = parseInt(item.split(",")[0]);
|
||||
const y = parseInt(item.split(",")[1]);
|
||||
const color = item.split(",")[2];
|
||||
|
||||
const section = getCanvasSectionFromCoords(x, y);
|
||||
|
||||
const pixels: string[] = (
|
||||
(await redis.get(
|
||||
Redis.key("canvas_section", section.start, section.end)
|
||||
)) || ""
|
||||
).split(",");
|
||||
|
||||
const arrX = x - section.start[0];
|
||||
const arrY = y - section.start[1];
|
||||
|
||||
pixels[canvasSectionSize[0] * arrY + arrX] = color;
|
||||
|
||||
await redis.set(
|
||||
Redis.key("canvas_section", section.start, section.end),
|
||||
pixels.join(",")
|
||||
);
|
||||
|
||||
startWriteQueue();
|
||||
};
|
||||
|
||||
const doCache = async (
|
||||
start: [x: number, y: number],
|
||||
end: [x: number, y: number]
|
||||
) => {
|
||||
const now = Date.now();
|
||||
Logger.info(
|
||||
"starting cache of section " + start.join(",") + " -> " + end.join(",")
|
||||
);
|
||||
const dbpixels = await prisma.pixel.findMany({
|
||||
where: {
|
||||
x: {
|
||||
gte: start[0],
|
||||
lt: end[0],
|
||||
},
|
||||
y: {
|
||||
gte: start[1],
|
||||
lt: end[1],
|
||||
},
|
||||
isTop: true,
|
||||
},
|
||||
});
|
||||
|
||||
const pixels: string[] = [];
|
||||
|
||||
// (y -> x) because of how the conversion needs to be done later
|
||||
// if this is inverted, the map will flip when rebuilding the cache (5 minute expiry)
|
||||
// fixes #24
|
||||
for (let y = start[1]; y < end[1]; y++) {
|
||||
for (let x = start[0]; x < end[0]; x++) {
|
||||
pixels.push(
|
||||
dbpixels.find((px) => px.x === x && px.y === y)?.color || "transparent"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
await redis.set(Redis.key("canvas_section", start, end), pixels.join(","));
|
||||
|
||||
Logger.info(
|
||||
"finished cache of section " +
|
||||
start.join(",") +
|
||||
" -> " +
|
||||
end.join(",") +
|
||||
" in " +
|
||||
((Date.now() - now) / 1000).toFixed(2) +
|
||||
"s"
|
||||
);
|
||||
};
|
|
@ -1,8 +1,12 @@
|
|||
import { Worker, WorkerOptions } from "node:worker_threads";
|
||||
import path from "node:path";
|
||||
import { v4 as uuid } from "uuid";
|
||||
import { getLogger } from "../lib/Logger";
|
||||
|
||||
const Logger = getLogger("WORKER_ROOT");
|
||||
export const CACHE_WORKERS = process.env.CACHE_WORKERS
|
||||
? parseInt(process.env.CACHE_WORKERS) || 1
|
||||
: 1;
|
||||
|
||||
export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
|
||||
if (process.env.NODE_ENV === "production") {
|
||||
|
@ -34,11 +38,141 @@ export const spawnWorker = (file: string, wkOpts: WorkerOptions = {}) => {
|
|||
}
|
||||
};
|
||||
|
||||
const AllWorkers = {
|
||||
canvas: spawnWorker("canvas.ts"),
|
||||
// not used as of right now
|
||||
// dedicated worker threads for specific tasks would go here
|
||||
const AllWorkers: { [k: string]: Worker } = {};
|
||||
|
||||
let cacheWorkers: Worker[] = [];
|
||||
|
||||
/**
|
||||
* Return consistent worker ID for the specified coordinates
|
||||
* @param x
|
||||
* @param y
|
||||
* @returns
|
||||
*/
|
||||
export const getCacheWorkerIdForCoords = (x: number, y: number): number => {
|
||||
const key = (x + y) % cacheWorkers.length;
|
||||
return key;
|
||||
};
|
||||
|
||||
export const CanvasWorker = AllWorkers.canvas;
|
||||
/**
|
||||
* Return consistent worker for the specified coordinates
|
||||
* @param x
|
||||
* @param y
|
||||
* @returns
|
||||
*/
|
||||
export const getCacheWorkerForCoords = (x: number, y: number): Worker => {
|
||||
return cacheWorkers[getCacheWorkerIdForCoords(x, y)];
|
||||
};
|
||||
|
||||
/**
|
||||
* Spawns cache workers
|
||||
*
|
||||
* Promise resolves when all of them are alive
|
||||
*
|
||||
* @param num
|
||||
*/
|
||||
export const spawnCacheWorkers = async (num?: number): Promise<void> => {
|
||||
if (typeof num === "undefined") {
|
||||
// if the function isn't told, use the environment variables
|
||||
num = CACHE_WORKERS;
|
||||
}
|
||||
|
||||
Logger.info(`Spawning ${num} cache workers...`);
|
||||
|
||||
let pending: Promise<any>[] = [];
|
||||
|
||||
for (let i = 0; i < num; i++) {
|
||||
const worker = spawnWorker("canvas_cache");
|
||||
|
||||
pending.push(
|
||||
new Promise((res) => {
|
||||
worker.on("online", () => {
|
||||
Logger.info(`Canvas cache worker #${i} is now online`);
|
||||
|
||||
worker.postMessage({ type: "id", workerId: i });
|
||||
|
||||
res(undefined);
|
||||
});
|
||||
})
|
||||
);
|
||||
|
||||
worker.on("error", (err) => {
|
||||
Logger.error(`Canvas cache worker #${i} has errored`);
|
||||
console.error(err);
|
||||
});
|
||||
|
||||
worker.on("exit", (exit) => {
|
||||
Logger.warn(`Canvas cache worker #${i} has exited ${exit}`);
|
||||
const index = cacheWorkers.indexOf(worker);
|
||||
if (index > -1) {
|
||||
cacheWorkers.splice(index, 1);
|
||||
Logger.info(`Removed dead worker #${i} from pool`);
|
||||
}
|
||||
});
|
||||
|
||||
setupWorkerCallback(worker);
|
||||
|
||||
cacheWorkers.push(worker);
|
||||
}
|
||||
|
||||
await Promise.allSettled(pending);
|
||||
Logger.info(`Successfully spawned ${num} cache workers`);
|
||||
};
|
||||
|
||||
export const getCanvasCacheWorker = () => {
|
||||
return cacheWorkers[Math.floor(Math.random() * cacheWorkers.length)];
|
||||
};
|
||||
|
||||
let cacheWorkerQueue: { [k: string]: () => any } = {};
|
||||
|
||||
/**
|
||||
* Prometheus metrics
|
||||
* @returns
|
||||
*/
|
||||
export const getCacheWorkerQueueLength = () =>
|
||||
Object.keys(cacheWorkerQueue).length;
|
||||
|
||||
const setupWorkerCallback = (worker: Worker) => {
|
||||
worker.on("message", (message: { type: "callback"; callbackId: number }) => {
|
||||
if (message.type !== "callback") return;
|
||||
|
||||
const callback = cacheWorkerQueue[message.callbackId];
|
||||
if (!callback) {
|
||||
Logger.warn(
|
||||
"Received callback message from worker, but no callbacks are waiting " +
|
||||
message.callbackId
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
callback();
|
||||
delete cacheWorkerQueue[message.callbackId];
|
||||
});
|
||||
};
|
||||
|
||||
export const callCacheWorker = (type: string, data: any) => {
|
||||
return new Promise<void>((res) => {
|
||||
const callbackId = uuid();
|
||||
|
||||
cacheWorkerQueue[callbackId] = () => {
|
||||
res();
|
||||
clearTimeout(watchdog);
|
||||
};
|
||||
let watchdog = setTimeout(() => {
|
||||
Logger.error(
|
||||
`Callback for ${type} ${callbackId} has taken too long, is it dead?`
|
||||
);
|
||||
}, 10000);
|
||||
|
||||
const worker = getCanvasCacheWorker();
|
||||
worker.postMessage({
|
||||
...data,
|
||||
type,
|
||||
callbackId,
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
export const callWorkerMethod = (worker: Worker, type: string, data: any) => {
|
||||
return new Promise<void>((res) => {
|
||||
|
|
Loading…
Reference in New Issue