diff --git a/README.md b/README.md index c81963348c..724d914df2 100644 --- a/README.md +++ b/README.md @@ -66,7 +66,7 @@ This project supports creating virtual browsers (using https://github.com/m1k1o/ - Make sure you have an SSH key pair set up on the server (`id_rsa` in `~/.ssh` directory) - Add `DOCKER_VM_HOST=localhost` to your .env file (can substitute localhost for a public hostname) - Add `NODE_ENV=development` to .env to enable create-on-demand behavior for VMs -- Configure Redis by adding `REDIS_URL` to your .env file (Redis is required for virtual browser management) +- Configure Postgres by adding `DATABASE_URL` to your .env file (Postgres is required for virtual browser management) ### Room Persistence diff --git a/server/cleanup.ts b/server/cleanup.ts index d5a38d6433..85bc24e19d 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,11 +1,4 @@ -import { Client } from 'pg'; -import config from './config'; - -const postgres = new Client({ - connectionString: config.DATABASE_URL, - ssl: { rejectUnauthorized: false }, -}); -postgres.connect(); +import { postgres } from './utils/postgres'; cleanupPostgres(); setInterval(cleanupPostgres, 5 * 60 * 1000); diff --git a/server/config.ts b/server/config.ts index 1d69074907..9871831496 100644 --- a/server/config.ts +++ b/server/config.ts @@ -1,8 +1,8 @@ require('dotenv').config(); const defaults = { - REDIS_URL: '', // Optional, for room persistence and VM queueing (localhost:6379 for a local install) - DATABASE_URL: '', // Optional, for permanent rooms (localhost:5432 for a local install) + REDIS_URL: '', // Optional, for metrics + DATABASE_URL: '', // Optional, for permanent rooms and VBrowser management YOUTUBE_API_KEY: '', // Optional, provide one to enable searching YouTube NODE_ENV: '', // Usually, you should let process.env.NODE_ENV override this FIREBASE_ADMIN_SDK_CONFIG: '', // Optional, for features requiring sign-in/authentication diff --git a/server/room.ts b/server/room.ts index b7b6ff13ba..3798b9eeab 100644 --- a/server/room.ts +++ b/server/room.ts @@ -1,33 +1,19 @@ import config from './config'; import axios from 'axios'; -import Redis from 'ioredis'; import { Server, Socket } from 'socket.io'; -import { Client, QueryResult } from 'pg'; import { getUser, validateUserToken } from './utils/firebase'; -import { redisCount, redisCountDistinct } from './utils/redis'; +import { redis, redisCount, redisCountDistinct } from './utils/redis'; import { getIsSubscriberByEmail } from './utils/stripe'; import { AssignedVM } from './vm/base'; import { getStartOfDay } from './utils/time'; -import { updateObject, upsertObject } from './utils/postgres'; +import { postgres, updateObject, upsertObject } from './utils/postgres'; import { fetchYoutubeVideo, getYoutubeVideoID } from './utils/youtube'; import { v4 as uuidv4 } from 'uuid'; //@ts-ignore import twitch from 'twitch-m3u8'; - -let redis: Redis | undefined = undefined; -if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); -} -let postgres: Client | undefined = undefined; -if (config.DATABASE_URL) { - postgres = new Client({ - connectionString: config.DATABASE_URL, - ssl: { rejectUnauthorized: false }, - }); - postgres.connect(); -} +import { QueryResult } from 'pg'; export class Room { // Serialized state @@ -343,9 +329,6 @@ export class Room { await redis.lpush('vBrowserSessionMS', Number(new Date()) - assignTime); await redis.ltrim('vBrowserSessionMS', 0, 24); } - if (redis && uid) { - await redis.del('vBrowserUIDLock:' + uid); - } try { await axios.post( 'http://localhost:' + config.VMWORKER_PORT + '/releaseVM', @@ -382,6 +365,10 @@ export class Room { if (data === '') { this.playlistNext(null); } + // The room video is changing so remove room from vbrowser queue + postgres?.query('DELETE FROM vbrowser_queue WHERE "roomId" = $1', [ + this.roomId, + ]); }; public addChatMessage = (socket: Socket | null, chatMsg: ChatMessageBase) => { @@ -880,24 +867,17 @@ export class Room { redis.expireat('vBrowserUIDs', expireTime); const uidMinutes = await redis.zincrby('vBrowserUIDMinutes', 1, uid); redis.expireat('vBrowserUIDMinutes', expireTime); - // TODO limit users based on these counts - - const uidLock = await redis.set( - 'vBrowserUIDLock:' + uid, - '1', - 'EX', - 70, - 'NX' - ); - if (!uidLock) { - socket.emit( - 'errorMessage', - 'There is already an active vBrowser for this user.' - ); - return; - } + // TODO limit users based on client or uid usage } } + // TODO (howard) check if the user or room has a VM already in postgres + if (false) { + socket.emit( + 'errorMessage', + 'There is already an active vBrowser for this user.' + ); + return; + } let isLarge = false; let region = config.DEFAULT_VM_REGION; if (data && data.uid && data.token) { @@ -940,16 +920,27 @@ export class Room { redisCount('vBrowserStarts'); this.cmdHost(socket, 'vbrowser://'); + // Put the room in the vbrowser queue + await postgres?.query('INSERT INTO vbrowser_queue VALUES ($1, $2)', [ + this.roomId, + new Date(), + ]); const assignmentResp = await axios.post( 'http://localhost:' + config.VMWORKER_PORT + '/assignVM', { isLarge, region, uid, + roomId: this.roomId, } ); const assignment: AssignedVM = assignmentResp.data; - if (this.video !== 'vbrowser://') { + const inQueue = await postgres?.query( + 'SELECT "roomId" FROM vbrowser_queue WHERE "roomId" = $1 LIMIT 1', + [this.roomId] + ); + if (!Boolean(inQueue?.rows.length)) { + // This room is no longer waiting for VM (maybe user gave up) return; } if (!assignment) { diff --git a/server/server.ts b/server/server.ts index 89d982beba..1221554572 100644 --- a/server/server.ts +++ b/server/server.ts @@ -5,13 +5,13 @@ import bodyParser from 'body-parser'; import compression from 'compression'; import os from 'os'; import cors from 'cors'; -import Redis from 'ioredis'; import https from 'https'; import http from 'http'; import { Server } from 'socket.io'; import { searchYoutube } from './utils/youtube'; import { Room } from './room'; import { + redis, getRedisCountDay, getRedisCountDayDistinct, redisCount, @@ -23,11 +23,10 @@ import { } from './utils/stripe'; import { deleteUser, validateUserToken } from './utils/firebase'; import path from 'path'; -import { Client } from 'pg'; import { getStartOfDay } from './utils/time'; import { getBgVMManagers, getSessionLimitSeconds } from './vm/utils'; import { hashString } from './utils/string'; -import { insertObject, upsertObject } from './utils/postgres'; +import { postgres, insertObject, upsertObject } from './utils/postgres'; import axios from 'axios'; import crypto from 'crypto'; import zlib from 'zlib'; @@ -52,7 +51,7 @@ if (process.env.NODE_ENV === 'development') { const gzip = util.promisify(zlib.gzip); const releaseInterval = 5 * 60 * 1000; -const releaseBatches = 10; +const releaseBatches = config.NODE_ENV === 'development' ? 1 : 10; const app = express(); let server: any = null; if (config.SSL_KEY_FILE && config.SSL_CRT_FILE) { @@ -63,18 +62,6 @@ if (config.SSL_KEY_FILE && config.SSL_CRT_FILE) { server = new http.Server(app); } const io = new Server(server, { cors: {}, transports: ['websocket'] }); -let redis: Redis | undefined = undefined; -if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); -} -let postgres: Client | undefined = undefined; -if (config.DATABASE_URL) { - postgres = new Client({ - connectionString: config.DATABASE_URL, - ssl: { rejectUnauthorized: false }, - }); - postgres.connect(); -} const launchTime = Number(new Date()); const rooms = new Map(); @@ -148,7 +135,7 @@ app.post('/subtitle', async (req, res) => { .update(data, 'utf8') .digest() .toString('hex'); - let gzipData = (await gzip(data)) as Buffer; + let gzipData = await gzip(data); await redis.setex('subtitle:' + hash, 24 * 60 * 60, gzipData); redisCount('subUploads'); return res.json({ hash }); @@ -327,9 +314,9 @@ app.delete('/deleteAccount', async (req, res) => { } if (postgres) { // Delete rooms - await postgres?.query('DELETE FROM room WHERE owner = $1', [decoded.uid]); + await postgres.query('DELETE FROM room WHERE owner = $1', [decoded.uid]); // Delete linked accounts - await postgres?.query('DELETE FROM link_account WHERE uid = $1', [ + await postgres.query('DELETE FROM link_account WHERE uid = $1', [ decoded.uid, ]); } @@ -436,12 +423,12 @@ app.get('/linkAccount', async (req, res) => { } // Get the linked accounts for the user let linkAccounts: LinkAccount[] = []; - if (decoded?.uid) { - const result = await postgres?.query( + if (decoded?.uid && postgres) { + const { rows } = await postgres.query( 'SELECT kind, accountid, accountname, discriminator FROM link_account WHERE uid = $1', [decoded?.uid] ); - linkAccounts = result.rows; + linkAccounts = rows; } return res.json(linkAccounts); }); @@ -638,12 +625,11 @@ async function minuteMetrics() { for (let i = 0; i < roomArr.length; i++) { const room = roomArr[i]; if (room.vBrowser && room.vBrowser.id) { - // Renew the locks - await redis?.expire( - 'lock:' + room.vBrowser.provider + ':' + room.vBrowser.id, - 300 + // Update the heartbeat + await postgres?.query( + `UPDATE vbrowser SET "heartbeatTime" = $1 WHERE "roomId" = $2 and vmid = $3`, + [new Date(), room.roomId, room.vBrowser.id] ); - await redis?.expire('vBrowserUIDLock:' + room.vBrowser?.creatorUID, 70); const expireTime = getStartOfDay() / 1000 + 86400; if (room.vBrowser?.creatorClientID) { @@ -804,7 +790,10 @@ async function getStats() { const currentMemUsage = [process.memoryUsage().rss]; // Singleton stats below (same for all shards so don't combine) - let vBrowserWaiting = Number(await redis?.get('currentVBrowserWaiting')); + let vBrowserWaiting = Number( + (await postgres?.query('SELECT count(1) FROM vbrowser_queue'))?.rows[0] + ?.count + ); const cpuUsage = os.loadavg(); const redisUsage = Number( (await redis?.info()) diff --git a/server/syncSubs.ts b/server/syncSubs.ts index a0db5d2c82..87d568fcff 100644 --- a/server/syncSubs.ts +++ b/server/syncSubs.ts @@ -1,18 +1,13 @@ -import { Client } from 'pg'; import config from './config'; import { getUserByEmail } from './utils/firebase'; -import { insertObject, updateObject } from './utils/postgres'; +import { insertObject, newPostgres, updateObject } from './utils/postgres'; import { getAllActiveSubscriptions, getAllCustomers } from './utils/stripe'; import { Client as DiscordClient, IntentsBitField } from 'discord.js'; let lastSubs = ''; let currentSubs = ''; -const postgres2 = new Client({ - connectionString: config.DATABASE_URL, - ssl: { rejectUnauthorized: false }, -}); -postgres2.connect(); +const postgres2 = newPostgres(); // set up the Discord admin bot const discordBot = new DiscordClient({ diff --git a/server/timeSeries.ts b/server/timeSeries.ts index a4ce3f3ddd..a06705b83b 100644 --- a/server/timeSeries.ts +++ b/server/timeSeries.ts @@ -1,12 +1,7 @@ import config from './config'; -import Redis from 'ioredis'; import { statsAgg } from './utils/statsAgg'; import axios from 'axios'; - -let redis: Redis | undefined = undefined; -if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); -} +import { redis } from './utils/redis'; statsTimeSeries(); setInterval(statsTimeSeries, 5 * 60 * 1000); diff --git a/server/utils/postgres.ts b/server/utils/postgres.ts index a98beffef7..a21f2f2e70 100644 --- a/server/utils/postgres.ts +++ b/server/utils/postgres.ts @@ -1,4 +1,31 @@ import { Client, QueryResult } from 'pg'; +import config from '../config'; + +export let postgres: Client | undefined = undefined; +if (config.DATABASE_URL) { + postgres = new Client({ + connectionString: config.DATABASE_URL, + ssl: { rejectUnauthorized: false }, + }); + postgres.connect(); +} + +/** + * Use this if we need a new connection instead of sharing. + * Guarantees we'll return a client because we throw if we don't have it configured + * @returns + */ +export function newPostgres() { + if (!config.DATABASE_URL) { + throw new Error('postgres not configured'); + } + const postgres = new Client({ + connectionString: config.DATABASE_URL, + ssl: { rejectUnauthorized: false }, + }); + postgres.connect(); + return postgres; +} export async function updateObject( postgres: Client, diff --git a/server/utils/redis.ts b/server/utils/redis.ts index 42703f5287..3c47717047 100644 --- a/server/utils/redis.ts +++ b/server/utils/redis.ts @@ -2,7 +2,7 @@ import config from '../config'; import Redis from 'ioredis'; import { getStartOfHour } from './time'; -let redis: Redis | undefined = undefined; +export let redis: Redis | undefined = undefined; if (config.REDIS_URL) { redis = new Redis(config.REDIS_URL); } diff --git a/server/vm/base.ts b/server/vm/base.ts index cd1c696507..8dbeea4721 100644 --- a/server/vm/base.ts +++ b/server/vm/base.ts @@ -1,25 +1,18 @@ import config from '../config'; -import Redis from 'ioredis'; import axios from 'axios'; import { v4 as uuidv4 } from 'uuid'; -import { redisCount } from '../utils/redis'; +import { redis, redisCount } from '../utils/redis'; +import { newPostgres } from '../utils/postgres'; import { PoolConfig, PoolRegion } from './utils'; - -let redis: Redis | undefined = undefined; -if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); -} - const incrInterval = 5 * 1000; const decrInterval = 30 * 1000; const cleanupInterval = 5 * 60 * 1000; -const updateSizeInterval = 60 * 1000; + +const postgres = newPostgres(); export abstract class VMManager { protected isLarge = false; protected region: PoolRegion = 'US'; - protected redis: Redis; - private currentSize = 0; private limitSize = 0; private minSize = 0; @@ -28,10 +21,6 @@ export abstract class VMManager { this.region = region; this.limitSize = Number(limitSize); this.minSize = Number(minSize); - if (!redis) { - throw new Error('Cannot construct VMManager without Redis'); - } - this.redis = redis; } public getIsLarge = () => { @@ -54,8 +43,12 @@ export abstract class VMManager { return this.limitSize * 0.05; }; - public getCurrentSize = () => { - return this.currentSize; + public getCurrentSize = async () => { + const { rows } = await postgres.query( + `SELECT count(1) FROM vbrowser WHERE pool = $1`, + [this.getPoolName()] + ); + return Number(rows[0]?.count); }; public getPoolName = () => { @@ -84,34 +77,36 @@ export abstract class VMManager { return [Math.ceil(minBuffer), Math.ceil(minBuffer * 1.5)]; }; - public getRedisQueueKey = () => { - return ( - 'availableList' + this.id + this.region + (this.isLarge ? 'Large' : '') + public getAvailableCount = async (): Promise => { + const { rows } = await postgres.query( + `SELECT count(1) FROM vbrowser WHERE pool = $1 and state = 'available'`, + [this.getPoolName()] ); + return Number(rows[0]?.count); }; - public getRedisStagingKey = () => { - return ( - 'stagingList' + this.id + this.region + (this.isLarge ? 'Large' : '') + public getStagingCount = async (): Promise => { + const { rows } = await postgres.query( + `SELECT count(1) FROM vbrowser WHERE pool = $1 and state = 'staging'`, + [this.getPoolName()] ); + return Number(rows[0]?.count); }; - public getRedisAllKey = () => { - return 'allList' + this.id + this.region + (this.isLarge ? 'Large' : ''); - }; - - public getRedisHostCacheKey = () => { - return 'hostCache' + this.id + this.region + (this.isLarge ? 'Large' : ''); - }; - - public getRedisPoolSizeKey = () => { - return 'vmPoolFull' + this.id + this.region + (this.isLarge ? 'Large' : ''); + public getAvailableVBrowsers = async (): Promise => { + const { rows } = await postgres.query( + `SELECT vmid from vbrowser WHERE pool = $1 and state = 'available'`, + [this.getPoolName()] + ); + return rows.map((row: any) => row.vmid); }; - public getRedisTerminationKey = () => { - return ( - 'terminationList' + this.id + this.region + (this.isLarge ? 'Large' : '') + public getStagingVBrowsers = async (): Promise => { + const { rows } = await postgres.query( + `SELECT vmid from vbrowser WHERE pool = $1 and state = 'staging'`, + [this.getPoolName()] ); + return rows.map((row: any) => row.vmid); }; public getTag = () => { @@ -122,16 +117,123 @@ export abstract class VMManager { ); }; - public resetVM = async (id: string): Promise => { + public assignVM = async ( + roomId: string, + uid: string + ): Promise => { + if (!roomId || !uid) { + return undefined; + } + let postgres2 = newPostgres(); + await postgres2.query('BEGIN TRANSACTION'); + try { + const assignStart = Number(new Date()); + if (this.getMinSize() === 0) { + // Spawns a VM if none is available in the pool + const availableCount = await this.getAvailableCount(); + if (!availableCount) { + await this.startVMWrapper(); + } + } + // Update and use SKIP LOCKED to ensure each consumer only gets one + const getAssignedVM = async (): Promise => { + const { rows } = await postgres2.query( + ` + UPDATE vbrowser + SET "roomId" = $1, uid = $2, "heartbeatTime" = $3, "assignTime" = $4, state = 'used' + WHERE id = ( + SELECT id + FROM vbrowser + WHERE state = 'available' + AND pool = $5 + ORDER BY id ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + ) + RETURNING data`, + [roomId, uid, new Date(), new Date(), this.getPoolName()] + ); + return rows[0]?.data; + }; + let selected: VM | undefined = undefined; + while (!selected) { + // make sure this room still wants a VM, otherwise rollback the transaction to avoid waste + const inQueue = await postgres2.query( + 'SELECT "roomId" FROM vbrowser_queue WHERE "roomId" = $1 LIMIT 1', + [roomId] + ); + if (!Boolean(inQueue.rows.length)) { + await postgres2.query('ROLLBACK'); + await postgres2.end(); + console.log('[ASSIGN] room %s no longer in queue', roomId); + return undefined; + } + selected = await getAssignedVM(); + if (!selected) { + // Wait and try again + await new Promise((resolve) => setTimeout(resolve, 1000)); + } + } + const assignEnd = Number(new Date()); + const assignElapsed = assignEnd - assignStart; + await redis?.lpush('vBrowserStartMS', assignElapsed); + await redis?.ltrim('vBrowserStartMS', 0, 24); + console.log( + '[ASSIGN] %s to %s in %s', + selected.id, + roomId, + assignElapsed + 'ms' + ); + const retVal = { ...selected, assignTime: Number(new Date()) }; + await postgres2.query('COMMIT'); + await postgres2.end(); + return retVal; + } catch (e) { + console.warn(e); + await postgres2.query('ROLLBACK'); + await postgres2.end(); + return undefined; + } + }; + + public resetVM = async (vmid: string, uid?: string): Promise => { + if (uid !== undefined) { + // verify the uid matches if user initiated + const vmUid = await postgres.query( + `SELECT uid FROM vbrowser WHERE pool = $1 AND vmid = $2`, + [this.getPoolName(), vmid] + ); + if (vmUid.rows[0]?.uid && vmUid.rows[0]?.uid !== uid) { + console.log( + '[RESET] uid mismatch on %s, expected %s, got %s', + vmid, + vmUid.rows[0]?.uid, + uid + ); + return; + } + } // We can attempt to reuse the instance which is more efficient if users tend to use them for a short time // Otherwise terminating them is simpler but more expensive since they're billed for an hour - console.log('[RESET]', id); - await this.rebootVM(id); - // Delete any locks/caches - await this.redis.del('lock:' + this.id + ':' + id); - await this.redis.del(this.getRedisHostCacheKey() + ':' + id); - // Add the VM back to the pool - await this.redis.rpush(this.getRedisStagingKey(), id); + console.log('[RESET]', vmid); + await this.rebootVM(vmid); + const { rowCount } = await postgres.query( + ` + UPDATE vbrowser + SET "roomId" = NULL, uid = NULL, retries = 0, "heartbeatTime" = NULL, "resetTime" = $3, "readyTime" = NULL, "assignTime" = NULL, data = NULL, state = 'staging' + WHERE pool = $1 AND vmid = $2`, + [this.getPoolName(), vmid, new Date()] + ); + console.log('UPDATE', rowCount); + if (rowCount === 0) { + // terminate if we don't have a record of it + // This could happen while we're migrating and don't have records yet + // Or if resetting a VM from cleanup that we didn't record in db + // Or if Docker terminated the VM in reboot already since we don't reuse containers + // Of if we resized down but didn't complete the termination + // In the Docker case that leads to a double terminate but might be ok + this.terminateVMWrapper(vmid); + } }; public startVMWrapper = async () => { @@ -139,7 +241,12 @@ export abstract class VMManager { try { const password = uuidv4(); const id = await this.startVM(password); - await this.redis.rpush(this.getRedisStagingKey(), id); + await postgres.query( + ` + INSERT INTO vbrowser(pool, vmid, "creationTime", state) + VALUES($1, $2, $3, 'staging')`, + [this.getPoolName(), id, new Date()] + ); redisCount('vBrowserLaunches'); return id; } catch (e: any) { @@ -152,344 +259,285 @@ export abstract class VMManager { } }; - protected terminateVMWrapper = async (id: string) => { - console.log('[TERMINATE]', id); - // Remove from lists, if it exists - await this.redis.lrem(this.getRedisQueueKey(), 0, id); - await this.redis.lrem(this.getRedisStagingKey(), 0, id); - // Get the VM data to calculate lifetime, if we fail do the terminate anyway - // const lifetime = await this.terminateVMMetrics(id); - await this.terminateVM(id); - // if (lifetime) { - // await this.redis.lpush('vBrowserVMLifetime', lifetime); - // await this.redis.ltrim('vBrowserVMLifetime', 0, 24); - // } - // Delete any locks - await this.redis.del('lock:' + this.id + ':' + id); - await this.redis.del(this.getRedisHostCacheKey() + ':' + id); - }; - - protected terminateVMMetrics = async (id: string) => { - try { - const vm = await this.getVM(id); - if (vm) { - const lifetime = - Number(new Date()) - Number(new Date(vm.creation_date)); - return lifetime; - } - } catch (e: any) { - console.warn(e.response?.data); - } - return 0; + protected terminateVMWrapper = async (vmid: string) => { + console.log('[TERMINATE]', vmid); + await this.terminateVM(vmid); + const { rowCount } = await postgres.query( + `DELETE FROM vbrowser WHERE pool = $1 AND vmid = $2 RETURNING id`, + [this.getPoolName(), vmid] + ); + // We can log the VM lifetime here if desired + console.log('DELETE', rowCount); }; public runBackgroundJobs = async () => { - try { - console.log( - '[VMWORKER] starting background jobs for %s', - this.getPoolName() - ); - const resizeVMGroupIncr = async () => { - const availableCount = await this.redis.llen(this.getRedisQueueKey()); - const stagingCount = await this.redis.llen(this.getRedisStagingKey()); - let launch = false; - launch = - availableCount + stagingCount < this.getAdjustedBuffer()[0] && - this.getCurrentSize() != null && - this.getCurrentSize() < (this.getLimitSize() || Infinity); - if (launch) { + const resizeVMGroupIncr = async () => { + const availableCount = await this.getAvailableCount(); + const stagingCount = await this.getStagingCount(); + const currentSize = await this.getCurrentSize(); + let launch = false; + launch = + availableCount + stagingCount < this.getAdjustedBuffer()[0] && + currentSize < (this.getLimitSize() || Infinity); + if (launch) { + console.log( + '[RESIZE-LAUNCH]', + 'minimum:', + this.getAdjustedBuffer()[0], + 'available:', + availableCount, + 'staging:', + stagingCount, + 'currentSize:', + currentSize, + 'limit:', + this.getLimitSize() + ); + this.startVMWrapper(); + } + }; + + const resizeVMGroupDecr = async () => { + let unlaunch = false; + const availableCount = await this.getAvailableCount(); + unlaunch = availableCount > this.getAdjustedBuffer()[1]; + if (unlaunch) { + // use SKIP LOCKED to delete to avoid deleting VM that might be assigning + // filter to only VMs eligible for deletion + // they must be up for long enough + // keep the oldest min pool size number of VMs + const { rows } = await postgres.query( + ` + DELETE FROM vbrowser + WHERE id = ( + SELECT id + FROM vbrowser + WHERE pool = $1 + AND state = 'available' + AND CAST(extract(epoch from now() - "creationTime") as INT) % (60 * 60) > $2 + ORDER BY id ASC + FOR UPDATE SKIP LOCKED + LIMIT 1 + OFFSET $3 + ) RETURNING vmid, CAST(extract(epoch from now() - "creationTime") as INT) % (60 * 60) as uptime_frac`, + [ + this.getPoolName(), + config.VM_MIN_UPTIME_MINUTES * 60, // to seconds + this.getMinSize(), + ] + ); + const first = rows[0]; + if (first) { console.log( - '[RESIZE-LAUNCH]', - 'minimum:', - this.getAdjustedBuffer()[0], - 'available:', - availableCount, - 'staging:', - stagingCount, - 'currentSize:', - this.getCurrentSize(), - 'limit:', - this.getLimitSize() + '[RESIZE-UNLAUNCH] %s up for %s seconds of hour', + first.vmid, + first.uptime_frac ); - this.startVMWrapper(); + await this.terminateVMWrapper(first.vmid); } - }; - - const resizeVMGroupDecr = async () => { - let unlaunch = false; - const availableCount = await this.redis.llen(this.getRedisQueueKey()); - unlaunch = availableCount > this.getAdjustedBuffer()[1]; - if (unlaunch) { - const ids = await this.redis.smembers(this.getRedisTerminationKey()); - // Remove the first available VM - let first = null; - let rem = 0; - while (ids.length && !rem) { - first = ids.shift(); - rem = first - ? await this.redis.lrem(this.getRedisQueueKey(), 1, first) - : 0; - if (first && rem) { - console.log('[RESIZE-UNLAUNCH]', first); - await this.terminateVMWrapper(first); - } + } + }; + + const cleanupVMGroup = async () => { + // Clean up hanging VMs + // It's possible we created a VM but lost track of it + // Take the list of VMs from API + // subtract VMs that have a heartbeat or are in the available or staging pool + // delete the rest + let allVMs = []; + try { + allVMs = await this.listVMs(this.getTag()); + } catch (e) { + console.log('cleanupVMGroup: failed to fetch VM list'); + return; + } + const { rows } = await postgres.query( + ` + SELECT vmid from vbrowser + WHERE pool = $1 + AND + ("heartbeatTime" > (NOW() - INTERVAL '5 minutes') + OR state = 'staging' + OR state = 'available') + `, + [this.getPoolName()] + ); + const dontDelete = new Set(rows.map((row: any) => row.vmid)); + console.log( + '[CLEANUP] %s: found %s VMs, %s to keep', + this.getPoolName(), + allVMs.length, + dontDelete.size + ); + for (let i = 0; i < allVMs.length; i++) { + const server = allVMs[i]; + if (!dontDelete.has(server.id)) { + console.log('[CLEANUP]', server.id); + try { + await this.resetVM(server.id); + //this.terminateVMWrapper(server.id); + } catch (e: any) { + console.warn(e.response?.data); } + await new Promise((resolve) => setTimeout(resolve, 2000)); } - }; + } + }; - const updateSize = async () => { - const allVMs = await this.listVMs(this.getTag()); - const now = Date.now(); - this.currentSize = allVMs.length; - await this.redis.setex( - this.getRedisPoolSizeKey(), - 2 * 60, - allVMs.length - ); - let sortedVMs = allVMs - // Sort newest first (decreasing alphabetically) - .sort((a, b) => -a.creation_date?.localeCompare(b.creation_date)) - // Remove the minimum number of VMs to keep - .slice(0, -this.getMinSize() || undefined) - // Consider only VMs that have been up for most of an hour - .filter( - (vm) => - (now - Number(new Date(vm.creation_date))) % (60 * 60 * 1000) > - config.VM_MIN_UPTIME_MINUTES * 60 * 1000 - ); - const cmd = this.redis.multi().del(this.getRedisTerminationKey()); - if (sortedVMs.length) { - cmd.sadd( - this.getRedisTerminationKey(), - sortedVMs.map((vm) => vm.id) - ); - } - await cmd.exec(); - if (allVMs.length) { - const availableKeys = await this.redis.lrange( - this.getRedisQueueKey(), - 0, - -1 - ); - const stagingKeys = await this.redis.lrange( - this.getRedisStagingKey(), - 0, - -1 - ); - console.log( - '[STATS] %s: currentSize %s, available %s, staging %s, buffer %s', - this.getPoolName(), - allVMs.length, - availableKeys.length, - stagingKeys.length, - this.getAdjustedBuffer() + const checkStaging = async () => { + const staging = await postgres.query( + `SELECT id FROM vbrowser WHERE pool = $1 and state = 'staging'`, + [this.getPoolName()] + ); + const stagingPromises = staging.rows.map((row: any) => { + return new Promise(async (resolve, reject) => { + const rowid = row.id; + // Increment retry count and return data + const { rows } = await postgres.query( + ` + UPDATE vbrowser + SET retries = retries + 1 WHERE id = $1 + RETURNING vmid, data, retries + `, + [rowid] ); - } - }; - - const cleanupVMGroup = async () => { - // Clean up hanging VMs - // It's possible we created a VM but lost track of it in redis - // Take the list of VMs from API, subtract VMs that have a lock in redis or are in the available or staging pool, delete the rest - let allVMs = []; - try { - allVMs = await this.listVMs(this.getTag()); - } catch (e) { - console.log('cleanupVMGroup: failed to fetch VM list'); - return; - } - const usedKeys: string[] = []; - for (let i = 0; i < allVMs.length; i++) { - if (await this.redis.get(`lock:${this.id}:${allVMs[i].id}`)) { - usedKeys.push(allVMs[i].id); + const first = rows[0]; + if (!first) { + return reject('row not found for id ' + rowid); } - } - const availableKeys = await this.redis.lrange( - this.getRedisQueueKey(), - 0, - -1 - ); - const stagingKeys = await this.redis.lrange( - this.getRedisStagingKey(), - 0, - -1 - ); - const dontDelete = new Set([ - ...usedKeys, - ...availableKeys, - ...stagingKeys, - ]); - console.log( - '[CLEANUP] %s: cleanup %s VMs', - this.getPoolName(), - allVMs.length - dontDelete.size - ); - for (let i = 0; i < allVMs.length; i++) { - const server = allVMs[i]; - if (!dontDelete.has(server.id)) { - console.log('[CLEANUP]', server.id); + let vmid = first.vmid as string; + let retryCount = first.retries as number; + let vm = first.data as VM | null; + if (retryCount < this.minRetries) { + if (config.NODE_ENV === 'development') { + console.log( + '[CHECKSTAGING] attempt %s, waiting for minRetries', + retryCount + ); + } + // Do a minimum # of retries to give reboot time + return resolve(vmid + ', ' + retryCount + ', ' + false); + } + let ready = false; + // Fetch data on first attempt + // Try again only every once in a while to reduce load on cloud API + const shouldFetchVM = + retryCount === this.minRetries + 1 || retryCount % 20 === 0; + if (!vm && shouldFetchVM) { try { - await this.resetVM(server.id); + vm = await this.getVM(vmid); } catch (e: any) { console.warn(e.response?.data); + if (e.response?.status === 404) { + // Remove the VM beecause the provider says it doesn't exist + await postgres.query('DELETE FROM vbrowser WHERE id = $1', [ + rowid, + ]); + return reject('failed to find vm ' + vmid); + } } - //this.terminateVMWrapper(server.id); - await new Promise((resolve) => setTimeout(resolve, 2000)); - } - } - }; - - const checkStaging = async () => { - try { - // Get staging list and check if VM is ready - const stagingKeys = await this.redis.lrange( - this.getRedisStagingKey(), - 0, - -1 - ); - const stagingPromises = stagingKeys.map((id) => { - return new Promise(async (resolve, reject) => { - const retryCount = await this.redis.incr( - this.getRedisStagingKey() + ':' + id + if (vm?.host) { + // Save the VM data + await postgres.query( + `UPDATE vbrowser SET data = $1 WHERE id = $2`, + [vm, rowid] ); - if (retryCount < this.minRetries) { - // Do a minimum # of retries to give reboot time - return resolve(id + ', ' + retryCount + ', ' + false); - } - let ready = false; - let vm: VM | null = null; - try { - const cached = await this.redis.get( - this.getRedisHostCacheKey() + ':' + id + } + } + if (!vm?.host) { + console.log('[CHECKSTAGING] no host for vm %s', vmid); + return reject('no host for vm ' + vmid); + } + ready = await checkVMReady(vm.host); + if (ready) { + console.log('[CHECKSTAGING] ready:', vmid, vm?.host, retryCount); + await postgres.query( + `UPDATE vbrowser SET state = 'available', "readyTime" = $2 WHERE id = $1`, + [rowid, new Date()] + ); + await redis?.lpush('vBrowserStageRetries', retryCount); + await redis?.ltrim('vBrowserStageRetries', 0, 24); + } else { + if (retryCount >= 240) { + console.log('[CHECKSTAGING]', 'giving up:', vmid); + redisCount('vBrowserStagingFails'); + await redis?.lpush('vBrowserStageFails', vmid); + await redis?.ltrim('vBrowserStageFails', 0, 24); + await this.resetVM(vmid); + // await this.terminateVMWrapper(id); + } else { + if (retryCount % 150 === 0) { + console.log( + '[CHECKSTAGING] %s attempt to poweron, attach to network', + vmid ); - vm = cached ? JSON.parse(cached) : null; - if ( - !vm && - (retryCount === this.minRetries + 1 || retryCount % 20 === 0) - ) { - vm = await this.getVM(id); - if (vm?.host) { - console.log( - '[CHECKSTAGING] caching host %s for id %s', - vm?.host, - id - ); - await this.redis.setex( - this.getRedisHostCacheKey() + ':' + id, - 3600, - JSON.stringify(vm) - ); - } - } - } catch (e: any) { - console.warn(e.response?.data); - if (e.response?.status === 404) { - await this.redis.lrem(this.getRedisQueueKey(), 0, id); - await this.redis.lrem(this.getRedisStagingKey(), 0, id); - await this.redis.del(this.getRedisStagingKey() + ':' + id); - return reject(); - } - } - if (!vm?.host) { - console.log('[CHECKSTAGING] no host for vm %s', id); - return reject(); + this.powerOn(vmid); + //this.attachToNetwork(id); } - ready = await checkVMReady(vm?.host); - //ready = retryCount > 100; - if (ready) { - console.log('[CHECKSTAGING] ready:', id, vm?.host, retryCount); - // If it is, move it to available list - const rem = await this.redis.lrem( - this.getRedisStagingKey(), - 1, - id + if ( + retryCount % (config.NODE_ENV === 'development' ? 1 : 30) === + 0 + ) { + console.log( + '[CHECKSTAGING]', + 'not ready:', + vmid, + vm.host, + retryCount ); - if (rem) { - await this.redis - .multi() - .rpush(this.getRedisQueueKey(), id) - .del(this.getRedisStagingKey() + ':' + id) - .exec(); - await this.redis.lpush('vBrowserStageRetries', retryCount); - await this.redis.ltrim('vBrowserStageRetries', 0, 24); - } - } else { - if (retryCount >= 240) { - console.log('[CHECKSTAGING]', 'giving up:', id); - await this.redis - .multi() - .lrem(this.getRedisStagingKey(), 0, id) - .del(this.getRedisStagingKey() + ':' + id) - .exec(); - redisCount('vBrowserStagingFails'); - await this.redis.lpush('vBrowserStageFails', id); - await this.redis.ltrim('vBrowserStageFails', 0, 24); - await this.resetVM(id); - // await this.terminateVMWrapper(id); - } else { - if (retryCount % 150 === 0) { - console.log( - '[CHECKSTAGING] %s attempt to poweron, attach to network', - id - ); - this.powerOn(id); - //this.attachToNetwork(id); - } - if (retryCount % 30 === 0) { - console.log( - '[CHECKSTAGING]', - 'not ready:', - id, - vm?.host, - retryCount - ); - } - } } - resolve(id + ', ' + retryCount + ', ' + ready); - }); - }); - // console.time('[CHECKSTAGING] ' + this.getPoolName()); - const result = await Promise.race([ - Promise.allSettled(stagingPromises), - new Promise((resolve) => setTimeout(resolve, 30000)), - ]); - // console.timeEnd('[CHECKSTAGING] ' + this.getPoolName()); - return result; - } catch (e) { - console.warn('[CHECKSTAGING-ERROR]', e); - return []; - } - }; - - setInterval(resizeVMGroupIncr, incrInterval); - setInterval(resizeVMGroupDecr, decrInterval); + } + } + resolve(vmid + ', ' + retryCount + ', ' + ready); + }); + }); + // TODO log something if we timeout + const result = await Promise.race([ + Promise.allSettled(stagingPromises), + new Promise((resolve) => setTimeout(resolve, 30000)), + ]); + return result; + }; + + console.log( + '[VMWORKER] starting background jobs for %s', + this.getPoolName() + ); - updateSize(); - setInterval(updateSize, updateSizeInterval); + setInterval(resizeVMGroupIncr, incrInterval); + setInterval(resizeVMGroupDecr, decrInterval); + setInterval(async () => { + console.log( + '[STATS] %s: currentSize %s, available %s, staging %s, buffer %s', + this.getPoolName(), + await this.getCurrentSize(), + await this.getAvailableCount(), + await this.getStagingCount(), + this.getAdjustedBuffer() + ); + }, 10000); - setImmediate(async () => { - while (true) { - try { - await cleanupVMGroup(); - } catch (e: any) { - console.log('error in cleanupVMGroup'); - console.warn(e.response?.data); - } - await new Promise((resolve) => setTimeout(resolve, cleanupInterval)); + setImmediate(async () => { + while (true) { + try { + await cleanupVMGroup(); + } catch (e: any) { + console.warn('[CLEANUPVMGROUP-ERROR]', e.response?.data); } - }); + await new Promise((resolve) => setTimeout(resolve, cleanupInterval)); + } + }); - const checkStagingInterval = 1000; + setImmediate(async () => { while (true) { - await new Promise((resolve) => - setTimeout(resolve, checkStagingInterval) - ); - await checkStaging(); + try { + await checkStaging(); + } catch (e) { + console.warn('[CHECKSTAGING-ERROR]', e); + } + await new Promise((resolve) => setTimeout(resolve, 1000)); } - } catch (e) { - console.log('error in runBackgroundJobs:', e); - } + }); }; public abstract id: string; diff --git a/server/vm/docker.ts b/server/vm/docker.ts index 787299e7df..4d24940d6b 100644 --- a/server/vm/docker.ts +++ b/server/vm/docker.ts @@ -1,7 +1,6 @@ // This assumes an installation of Docker exists at the Docker VM host // and that host is configured to accept our SSH key import config from '../config'; -import { v4 as uuidv4 } from 'uuid'; import { VMManager, VM } from './base'; import { imageName } from './utils'; //@ts-ignore @@ -62,12 +61,8 @@ export class Docker extends VMManager { }; rebootVM = async (id: string) => { - return await this.terminateVM(id); - }; - - // Override the base method, since we don't need to reuse docker containers - resetVM = async (id: string) => { - return await this.terminateVM(id); + // We don't need to reuse Docker containers + return await this.terminateVMWrapper(id); }; getVM = async (id: string) => { diff --git a/server/vm/hetzner.ts b/server/vm/hetzner.ts index ac82a0cab1..28d004ba61 100644 --- a/server/vm/hetzner.ts +++ b/server/vm/hetzner.ts @@ -3,6 +3,7 @@ import axios from 'axios'; import { v4 as uuidv4 } from 'uuid'; import { VMManager, VM } from './base'; import fs from 'fs'; +import { redis } from '../utils/redis'; const HETZNER_TOKEN = config.HETZNER_TOKEN; const sshKeys = config.HETZNER_SSH_KEYS.split(',').map(Number); @@ -27,6 +28,26 @@ export class Hetzner extends VMManager { } startVM = async (name: string) => { + const data = { + name: name, + server_type: this.isLarge ? this.largeSize : this.size, + start_after_create: true, + image: imageId, + ssh_keys: sshKeys, + public_net: { + enable_ipv4: true, + enable_ipv6: false, + }, + // networks: [ + // this.networks[Math.floor(Math.random() * this.networks.length)], + // ], + // user_data: `replace with vbrowser.sh startup script if we want to boot vbrowser on instance creation (won't trigger on rebuild/restart)` + labels: { + [this.getTag()]: '1', + originalName: name, + }, + location: this.getRandomDatacenter(), + }; const response = await axios({ method: 'POST', url: `https://api.hetzner.cloud/v1/servers`, @@ -34,26 +55,7 @@ export class Hetzner extends VMManager { Authorization: 'Bearer ' + HETZNER_TOKEN, 'Content-Type': 'application/json', }, - data: { - name: name, - server_type: this.isLarge ? this.largeSize : this.size, - start_after_create: true, - image: imageId, - ssh_keys: sshKeys, - public_net: { - enable_ipv4: true, - enable_ipv6: false, - }, - // networks: [ - // this.networks[Math.floor(Math.random() * this.networks.length)], - // ], - // user_data: `replace with vbrowser.sh startup script if we want to boot vbrowser on instance creation (won't trigger on rebuild/restart)` - labels: { - [this.getTag()]: '1', - originalName: name, - }, - location: this.getRandomDatacenter(), - }, + data, }); const id = response.data.server.id; return id; @@ -115,10 +117,7 @@ export class Hetzner extends VMManager { id, response?.headers['ratelimit-remaining'] ); - this.redis?.set( - 'hetznerApiRemaining', - response?.headers['ratelimit-remaining'] - ); + redis?.set('hetznerApiRemaining', response?.headers['ratelimit-remaining']); if (response.data.server.private_net?.length > 1) { console.log('[WARNING] %s has more than one private network', id); } @@ -247,6 +246,9 @@ export class Hetzner extends VMManager { const public_ip = server.public_net?.ipv4?.ip; // const private_ip = server.private_net?.[0]?.ip; const ip = public_ip; + // We can use either the public or private IP for communicating between gateway and VM + // Only signaling traffic goes through here since the video is transmitted over WebRTC + // The private IP requires the server and gateway to be on the same network and there is a limit to the number of servers allowed return { id: server.id?.toString(), pass: server.name, diff --git a/server/vm/utils.ts b/server/vm/utils.ts index 23b44677a2..0f1d396f76 100644 --- a/server/vm/utils.ts +++ b/server/vm/utils.ts @@ -1,5 +1,4 @@ import { AssignedVM, VMManager } from './base'; -import Redis from 'ioredis'; import config from '../config'; import { Scaleway } from './scaleway'; import { Hetzner } from './hetzner'; @@ -9,64 +8,6 @@ import { Docker } from './docker'; // Chromium on ARM: ghcr.io/howardchung/vbrowser/arm-chromium export const imageName = 'howardc93/vbrowser'; -export const assignVM = async ( - redis: Redis, - vmManager: VMManager -): Promise => { - try { - const assignStart = Number(new Date()); - let selected = null; - while (!selected) { - if (vmManager.getMinSize() === 0) { - // This code spawns a VM if none is available in the pool - const availableCount = await redis.llen(vmManager.getRedisQueueKey()); - if (!availableCount) { - await vmManager.startVMWrapper(); - } - } - let resp = await redis.blpop( - vmManager.getRedisQueueKey(), - config.VM_ASSIGNMENT_TIMEOUT - ); - if (!resp) { - return undefined; - } - const id = resp[1]; - console.log('[ASSIGN]', id); - const lock = await redis.set( - 'lock:' + vmManager.id + ':' + id, - '1', - 'EX', - 300, - 'NX' - ); - if (!lock) { - console.log('failed to acquire lock on VM:', id); - continue; - } - const cachedData = await redis.get( - vmManager.getRedisHostCacheKey() + ':' + id - ); - let candidate = - cachedData && cachedData.startsWith('{') && JSON.parse(cachedData); - if (!candidate) { - candidate = await vmManager.getVM(id); - } - selected = candidate; - } - const assignEnd = Number(new Date()); - const assignElapsed = assignEnd - assignStart; - await redis.lpush('vBrowserStartMS', assignElapsed); - await redis.ltrim('vBrowserStartMS', 0, 24); - console.log('[ASSIGN]', selected.id, assignElapsed + 'ms'); - const retVal = { ...selected, assignTime: Number(new Date()) }; - return retVal; - } catch (e) { - console.warn(e); - return undefined; - } -}; - export type PoolRegion = 'US' | 'USW' | 'EU'; export type PoolConfig = { provider: string; @@ -79,29 +20,28 @@ export type PoolConfig = { function createVMManager(poolConfig: PoolConfig): VMManager | null { let vmManager: VMManager | null = null; if ( - config.REDIS_URL && config.SCW_SECRET_KEY && config.SCW_ORGANIZATION_ID && + config.SCW_IMAGE && + config.SCW_GATEWAY && poolConfig.provider === 'Scaleway' ) { vmManager = new Scaleway(poolConfig); } else if ( - config.REDIS_URL && config.HETZNER_TOKEN && + config.HETZNER_IMAGE && + config.HETZNER_GATEWAY && poolConfig.provider === 'Hetzner' ) { vmManager = new Hetzner(poolConfig); } else if ( - config.REDIS_URL && config.DO_TOKEN && + config.DO_IMAGE && + config.DO_GATEWAY && poolConfig.provider === 'DO' ) { vmManager = new DigitalOcean(poolConfig); - } else if ( - config.REDIS_URL && - config.DOCKER_VM_HOST && - poolConfig.provider === 'Docker' - ) { + } else if (config.DOCKER_VM_HOST && poolConfig.provider === 'Docker') { vmManager = new Docker(poolConfig); } return vmManager; diff --git a/server/vmWorker.ts b/server/vmWorker.ts index bf27354937..03f295812c 100644 --- a/server/vmWorker.ts +++ b/server/vmWorker.ts @@ -1,16 +1,10 @@ import config from './config'; -import { assignVM, getBgVMManagers } from './vm/utils'; +import { getBgVMManagers } from './vm/utils'; import express from 'express'; -import Redis from 'ioredis'; import bodyParser from 'body-parser'; -let redis: Redis | undefined = undefined; -if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); -} const app = express(); const vmManagers = getBgVMManagers(); -const redisRefs: { [key: string]: Redis } = {}; app.use(bodyParser.json()); @@ -18,21 +12,8 @@ Object.values(vmManagers).forEach((manager) => { manager?.runBackgroundJobs(); }); -setInterval(() => { - redis?.setex('currentVBrowserWaiting', 90, Object.keys(redisRefs).length); -}, 60000); - app.post('/assignVM', async (req, res) => { try { - let redis: Redis | undefined = undefined; - if (config.REDIS_URL) { - redis = new Redis(config.REDIS_URL); - redisRefs[req.body.uid] = redis; - setTimeout(() => { - redis?.disconnect(); - delete redisRefs[req.body.uid]; - }, config.VM_ASSIGNMENT_TIMEOUT * 1000); - } // Find a pool that matches the size and region requirements const pool = Object.values(vmManagers).find((mgr) => { return ( @@ -41,18 +22,16 @@ app.post('/assignVM', async (req, res) => { ); }); // TODO maybe there's more than one, load balance between them? - if (redis && pool) { + if (pool) { console.log('assignVM from pool:', pool.getPoolName()); - const vm = await assignVM(redis, pool); - redis?.disconnect(); - delete redisRefs[req.body.uid]; + const vm = await pool.assignVM(req.body.roomId, req.body.uid); return res.json(vm ?? null); } + return res.status(400).end(); } catch (e) { console.warn(e); + return res.status(500).end(); } - console.log('failed to assignVM'); - return res.status(400).end(); }); app.post('/releaseVM', async (req, res) => { @@ -61,15 +40,14 @@ app.post('/releaseVM', async (req, res) => { vmManagers[ req.body.provider + (req.body.isLarge ? 'Large' : '') + req.body.region ]; - redisRefs[req.body.uid]?.disconnect(); - delete redisRefs[req.body.uid]; if (req.body.id) { - await pool?.resetVM(req.body.id); + await pool?.resetVM(req.body.id, req.body.uid); } + return res.end(); } catch (e) { console.warn(e); + return res.status(500).end(); } - return res.end(); }); app.get('/stats', async (req, res) => { @@ -77,22 +55,9 @@ app.get('/stats', async (req, res) => { for (let i = 0; i <= Object.keys(vmManagers).length; i++) { const key = Object.keys(vmManagers)[i]; const vmManager = vmManagers[key]; - const availableVBrowsers = await redis?.lrange( - vmManager?.getRedisQueueKey() || 'availableList', - 0, - -1 - ); - const stagingVBrowsers = await redis?.lrange( - vmManager?.getRedisStagingKey() || 'stagingList', - 0, - -1 - ); - // const terminationVBrowsers = await redis?.smembers( - // vmManager?.getRedisTerminationKey() || 'terminationList', - // ); - const size = await redis?.get( - vmManager?.getRedisPoolSizeKey() || 'vmPoolFull' - ); + const availableVBrowsers = vmManager?.getAvailableVBrowsers(); + const stagingVBrowsers = vmManager?.getStagingVBrowsers(); + const size = await vmManager?.getCurrentSize(); if (key && vmManager) { vmManagerStats[key] = { availableVBrowsers, @@ -115,9 +80,9 @@ app.get('/isFreePoolFull', async (req, res) => { }); let isFull = false; if (freePool) { - const availableCount = await redis?.llen(freePool.getRedisQueueKey()); + const availableCount = await freePool.getAvailableCount(); const limitSize = freePool?.getLimitSize() ?? 0; - const currentSize = await redis?.get(freePool.getRedisPoolSizeKey()); + const currentSize = await freePool.getCurrentSize(); isFull = Boolean( limitSize > 0 && (Number(availableCount) === 0 || diff --git a/sql/schema.sql b/sql/schema.sql index 21b30863aa..0b1dc266a7 100644 --- a/sql/schema.sql +++ b/sql/schema.sql @@ -16,6 +16,10 @@ CREATE TABLE room( "mediaPath" text, PRIMARY KEY ("roomId") ); +CREATE UNIQUE INDEX on room(LOWER(vanity)) WHERE vanity IS NOT NULL; +CREATE INDEX on room(owner) WHERE owner IS NOT NULL; +CREATE INDEX on room("creationTime"); +CREATE INDEX on room USING GIN("roomId" gin_trgm_ops); CREATE TABLE subscriber( "customerId" text, @@ -40,7 +44,27 @@ CREATE TABLE active_user( PRIMARY KEY(uid) ); -CREATE UNIQUE INDEX on room (LOWER(vanity)) WHERE vanity IS NOT NULL; -CREATE INDEX on room(owner) WHERE owner IS NOT NULL; -CREATE INDEX on room("creationTime"); -CREATE INDEX on room USING GIN("roomId" gin_trgm_ops); \ No newline at end of file +CREATE TABLE vbrowser( + id bigserial PRIMARY KEY, -- numeric sequence ID + pool text NOT NULL, -- name of the pool this VM is in, e.g. HetznerLargeUS + vmid text NOT NULL, -- identifier for the VM, only unique across a provider + state text NOT NULL, -- available, staging, used + "creationTime" timestamp with time zone NOT NULL, -- time the VM was created + "heartbeatTime" timestamp with time zone, -- last time a room reported this VM was in use + "resetTime" timestamp with time zone, -- last time the VM was reset + "readyTime" timestamp with time zone, -- last time the VM became ready + "assignTime" timestamp with time zone, -- last time the VM was assigned + "roomId" text, -- room VM assigned to + uid text, -- user requesting the VM + data json, -- metadata for the VM + retries int DEFAULT 0 -- how many tries we checked if VM is up +); +CREATE UNIQUE INDEX on vbrowser(pool, id); +CREATE INDEX on vbrowser(pool, state); +CREATE INDEX on vbrowser("roomId"); +CREATE INDEX on vbrowser(uid); + +CREATE TABLE vbrowser_queue( + "roomId" text NOT NULL PRIMARY KEY, + "queueTime" timestamp with time zone NOT NULL +); \ No newline at end of file