Skip to content

Commit

Permalink
rewrite vbrowser pooling with pg
Browse files Browse the repository at this point in the history
  • Loading branch information
howardchung committed Jan 12, 2024
1 parent d0f0821 commit 9ef84b0
Show file tree
Hide file tree
Showing 15 changed files with 561 additions and 597 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ This project supports creating virtual browsers (using https://github.com/m1k1o/
- Make sure you have an SSH key pair set up on the server (`id_rsa` in `~/.ssh` directory)
- Add `DOCKER_VM_HOST=localhost` to your .env file (can substitute localhost for a public hostname)
- Add `NODE_ENV=development` to .env to enable create-on-demand behavior for VMs
- Configure Redis by adding `REDIS_URL` to your .env file (Redis is required for virtual browser management)
- Configure Postgres by adding `DATABASE_URL` to your .env file (Postgres is required for virtual browser management)

### Room Persistence

Expand Down
9 changes: 1 addition & 8 deletions server/cleanup.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
import { Client } from 'pg';
import config from './config';

const postgres = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres.connect();
import { postgres } from './utils/postgres';

cleanupPostgres();
setInterval(cleanupPostgres, 5 * 60 * 1000);
Expand Down
4 changes: 2 additions & 2 deletions server/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
require('dotenv').config();

const defaults = {
REDIS_URL: '', // Optional, for room persistence and VM queueing (localhost:6379 for a local install)
DATABASE_URL: '', // Optional, for permanent rooms (localhost:5432 for a local install)
REDIS_URL: '', // Optional, for metrics
DATABASE_URL: '', // Optional, for permanent rooms and VBrowser management
YOUTUBE_API_KEY: '', // Optional, provide one to enable searching YouTube
NODE_ENV: '', // Usually, you should let process.env.NODE_ENV override this
FIREBASE_ADMIN_SDK_CONFIG: '', // Optional, for features requiring sign-in/authentication
Expand Down
65 changes: 28 additions & 37 deletions server/room.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,19 @@
import config from './config';

import axios from 'axios';
import Redis from 'ioredis';
import { Server, Socket } from 'socket.io';
import { Client, QueryResult } from 'pg';

import { getUser, validateUserToken } from './utils/firebase';
import { redisCount, redisCountDistinct } from './utils/redis';
import { redis, redisCount, redisCountDistinct } from './utils/redis';
import { getIsSubscriberByEmail } from './utils/stripe';
import { AssignedVM } from './vm/base';
import { getStartOfDay } from './utils/time';
import { updateObject, upsertObject } from './utils/postgres';
import { postgres, updateObject, upsertObject } from './utils/postgres';
import { fetchYoutubeVideo, getYoutubeVideoID } from './utils/youtube';
import { v4 as uuidv4 } from 'uuid';
//@ts-ignore
import twitch from 'twitch-m3u8';

let redis: Redis | undefined = undefined;
if (config.REDIS_URL) {
redis = new Redis(config.REDIS_URL);
}
let postgres: Client | undefined = undefined;
if (config.DATABASE_URL) {
postgres = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres.connect();
}
import { QueryResult } from 'pg';

export class Room {
// Serialized state
Expand Down Expand Up @@ -343,9 +329,6 @@ export class Room {
await redis.lpush('vBrowserSessionMS', Number(new Date()) - assignTime);
await redis.ltrim('vBrowserSessionMS', 0, 24);
}
if (redis && uid) {
await redis.del('vBrowserUIDLock:' + uid);
}
try {
await axios.post(
'http://localhost:' + config.VMWORKER_PORT + '/releaseVM',
Expand Down Expand Up @@ -382,6 +365,10 @@ export class Room {
if (data === '') {
this.playlistNext(null);
}
// The room video is changing so remove room from vbrowser queue
postgres?.query('DELETE FROM vbrowser_queue WHERE "roomId" = $1', [
this.roomId,
]);
};

public addChatMessage = (socket: Socket | null, chatMsg: ChatMessageBase) => {
Expand Down Expand Up @@ -880,24 +867,17 @@ export class Room {
redis.expireat('vBrowserUIDs', expireTime);
const uidMinutes = await redis.zincrby('vBrowserUIDMinutes', 1, uid);
redis.expireat('vBrowserUIDMinutes', expireTime);
// TODO limit users based on these counts

const uidLock = await redis.set(
'vBrowserUIDLock:' + uid,
'1',
'EX',
70,
'NX'
);
if (!uidLock) {
socket.emit(
'errorMessage',
'There is already an active vBrowser for this user.'
);
return;
}
// TODO limit users based on client or uid usage
}
}
// TODO (howard) check if the user or room has a VM already in postgres
if (false) {
socket.emit(
'errorMessage',
'There is already an active vBrowser for this user.'
);
return;
}
let isLarge = false;
let region = config.DEFAULT_VM_REGION;
if (data && data.uid && data.token) {
Expand Down Expand Up @@ -940,16 +920,27 @@ export class Room {

redisCount('vBrowserStarts');
this.cmdHost(socket, 'vbrowser://');
// Put the room in the vbrowser queue
await postgres?.query('INSERT INTO vbrowser_queue VALUES ($1, $2)', [
this.roomId,
new Date(),
]);
const assignmentResp = await axios.post(
'http://localhost:' + config.VMWORKER_PORT + '/assignVM',
{
isLarge,
region,
uid,
roomId: this.roomId,
}
);
const assignment: AssignedVM = assignmentResp.data;
if (this.video !== 'vbrowser://') {
const inQueue = await postgres?.query(
'SELECT "roomId" FROM vbrowser_queue WHERE "roomId" = $1 LIMIT 1',
[this.roomId]
);
if (!Boolean(inQueue?.rows.length)) {
// This room is no longer waiting for VM (maybe user gave up)
return;
}
if (!assignment) {
Expand Down
45 changes: 17 additions & 28 deletions server/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import bodyParser from 'body-parser';
import compression from 'compression';
import os from 'os';
import cors from 'cors';
import Redis from 'ioredis';
import https from 'https';
import http from 'http';
import { Server } from 'socket.io';
import { searchYoutube } from './utils/youtube';
import { Room } from './room';
import {
redis,
getRedisCountDay,
getRedisCountDayDistinct,
redisCount,
Expand All @@ -23,11 +23,10 @@ import {
} from './utils/stripe';
import { deleteUser, validateUserToken } from './utils/firebase';
import path from 'path';
import { Client } from 'pg';
import { getStartOfDay } from './utils/time';
import { getBgVMManagers, getSessionLimitSeconds } from './vm/utils';
import { hashString } from './utils/string';
import { insertObject, upsertObject } from './utils/postgres';
import { postgres, insertObject, upsertObject } from './utils/postgres';
import axios from 'axios';
import crypto from 'crypto';
import zlib from 'zlib';
Expand All @@ -52,7 +51,7 @@ if (process.env.NODE_ENV === 'development') {
const gzip = util.promisify(zlib.gzip);

const releaseInterval = 5 * 60 * 1000;
const releaseBatches = 10;
const releaseBatches = config.NODE_ENV === 'development' ? 1 : 10;
const app = express();
let server: any = null;
if (config.SSL_KEY_FILE && config.SSL_CRT_FILE) {
Expand All @@ -63,18 +62,6 @@ if (config.SSL_KEY_FILE && config.SSL_CRT_FILE) {
server = new http.Server(app);
}
const io = new Server(server, { cors: {}, transports: ['websocket'] });
let redis: Redis | undefined = undefined;
if (config.REDIS_URL) {
redis = new Redis(config.REDIS_URL);
}
let postgres: Client | undefined = undefined;
if (config.DATABASE_URL) {
postgres = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres.connect();
}

const launchTime = Number(new Date());
const rooms = new Map<string, Room>();
Expand Down Expand Up @@ -148,7 +135,7 @@ app.post('/subtitle', async (req, res) => {
.update(data, 'utf8')
.digest()
.toString('hex');
let gzipData = (await gzip(data)) as Buffer;
let gzipData = await gzip(data);
await redis.setex('subtitle:' + hash, 24 * 60 * 60, gzipData);
redisCount('subUploads');
return res.json({ hash });
Expand Down Expand Up @@ -327,9 +314,9 @@ app.delete('/deleteAccount', async (req, res) => {
}
if (postgres) {
// Delete rooms
await postgres?.query('DELETE FROM room WHERE owner = $1', [decoded.uid]);
await postgres.query('DELETE FROM room WHERE owner = $1', [decoded.uid]);
// Delete linked accounts
await postgres?.query('DELETE FROM link_account WHERE uid = $1', [
await postgres.query('DELETE FROM link_account WHERE uid = $1', [
decoded.uid,
]);
}
Expand Down Expand Up @@ -436,12 +423,12 @@ app.get('/linkAccount', async (req, res) => {
}
// Get the linked accounts for the user
let linkAccounts: LinkAccount[] = [];
if (decoded?.uid) {
const result = await postgres?.query(
if (decoded?.uid && postgres) {
const { rows } = await postgres.query(
'SELECT kind, accountid, accountname, discriminator FROM link_account WHERE uid = $1',
[decoded?.uid]
);
linkAccounts = result.rows;
linkAccounts = rows;
}
return res.json(linkAccounts);
});
Expand Down Expand Up @@ -638,12 +625,11 @@ async function minuteMetrics() {
for (let i = 0; i < roomArr.length; i++) {
const room = roomArr[i];
if (room.vBrowser && room.vBrowser.id) {
// Renew the locks
await redis?.expire(
'lock:' + room.vBrowser.provider + ':' + room.vBrowser.id,
300
// Update the heartbeat
await postgres?.query(
`UPDATE vbrowser SET "heartbeatTime" = $1 WHERE "roomId" = $2 and vmid = $3`,
[new Date(), room.roomId, room.vBrowser.id]
);
await redis?.expire('vBrowserUIDLock:' + room.vBrowser?.creatorUID, 70);

const expireTime = getStartOfDay() / 1000 + 86400;
if (room.vBrowser?.creatorClientID) {
Expand Down Expand Up @@ -804,7 +790,10 @@ async function getStats() {
const currentMemUsage = [process.memoryUsage().rss];

// Singleton stats below (same for all shards so don't combine)
let vBrowserWaiting = Number(await redis?.get('currentVBrowserWaiting'));
let vBrowserWaiting = Number(
(await postgres?.query('SELECT count(1) FROM vbrowser_queue'))?.rows[0]
?.count
);
const cpuUsage = os.loadavg();
const redisUsage = Number(
(await redis?.info())
Expand Down
9 changes: 2 additions & 7 deletions server/syncSubs.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,13 @@
import { Client } from 'pg';
import config from './config';
import { getUserByEmail } from './utils/firebase';
import { insertObject, updateObject } from './utils/postgres';
import { insertObject, newPostgres, updateObject } from './utils/postgres';
import { getAllActiveSubscriptions, getAllCustomers } from './utils/stripe';
import { Client as DiscordClient, IntentsBitField } from 'discord.js';

let lastSubs = '';
let currentSubs = '';

const postgres2 = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres2.connect();
const postgres2 = newPostgres();

// set up the Discord admin bot
const discordBot = new DiscordClient({
Expand Down
7 changes: 1 addition & 6 deletions server/timeSeries.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
import config from './config';
import Redis from 'ioredis';
import { statsAgg } from './utils/statsAgg';
import axios from 'axios';

let redis: Redis | undefined = undefined;
if (config.REDIS_URL) {
redis = new Redis(config.REDIS_URL);
}
import { redis } from './utils/redis';

statsTimeSeries();
setInterval(statsTimeSeries, 5 * 60 * 1000);
Expand Down
27 changes: 27 additions & 0 deletions server/utils/postgres.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
import { Client, QueryResult } from 'pg';
import config from '../config';

export let postgres: Client | undefined = undefined;
if (config.DATABASE_URL) {
postgres = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres.connect();
}

/**
* Use this if we need a new connection instead of sharing.
* Guarantees we'll return a client because we throw if we don't have it configured
* @returns
*/
export function newPostgres() {
if (!config.DATABASE_URL) {
throw new Error('postgres not configured');
}
const postgres = new Client({
connectionString: config.DATABASE_URL,
ssl: { rejectUnauthorized: false },
});
postgres.connect();
return postgres;
}

export async function updateObject(
postgres: Client,
Expand Down
2 changes: 1 addition & 1 deletion server/utils/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import config from '../config';
import Redis from 'ioredis';
import { getStartOfHour } from './time';

let redis: Redis | undefined = undefined;
export let redis: Redis | undefined = undefined;
if (config.REDIS_URL) {
redis = new Redis(config.REDIS_URL);
}
Expand Down
Loading

0 comments on commit 9ef84b0

Please sign in to comment.