From 434437d1743d83345056a982628cbb678d0ea1a1 Mon Sep 17 00:00:00 2001 From: Hoang Pham Date: Mon, 9 Dec 2024 16:34:19 +0700 Subject: [PATCH] Fix server crashed, regular cleanups Signed-off-by: Hoang Pham --- tests/integration/configMock.js | 42 +++ tests/integration/metrics.spec.mjs | 35 +- tests/integration/socket.spec.mjs | 46 +-- vitest.config.js | 14 +- websocket_server/ApiService.js | 12 +- websocket_server/AppManager.js | 13 +- websocket_server/BackupManager.js | 43 +-- websocket_server/Config.js | 86 +++++ websocket_server/Constants.js | 35 ++ websocket_server/GeneralLRUStrategy.js | 43 +++ websocket_server/GeneralRedisStrategy.js | 69 ++++ websocket_server/InMemoryStrategy.js | 35 ++ websocket_server/LRUCacheStrategy.js | 5 +- websocket_server/PrometheusDataManager.js | 5 +- websocket_server/RedisStrategy.js | 52 +-- websocket_server/RoomDataManager.js | 35 +- websocket_server/ServerManager.js | 147 ++++++-- websocket_server/SharedTokenGenerator.js | 10 +- websocket_server/SocketDataManager.js | 32 -- websocket_server/SocketManager.js | 406 ++++++++++++++++------ websocket_server/StorageManager.js | 17 +- websocket_server/Utils.js | 21 +- websocket_server/main.js | 26 +- 23 files changed, 871 insertions(+), 358 deletions(-) create mode 100644 tests/integration/configMock.js create mode 100644 websocket_server/Config.js create mode 100644 websocket_server/Constants.js create mode 100644 websocket_server/GeneralLRUStrategy.js create mode 100644 websocket_server/GeneralRedisStrategy.js create mode 100644 websocket_server/InMemoryStrategy.js delete mode 100644 websocket_server/SocketDataManager.js diff --git a/tests/integration/configMock.js b/tests/integration/configMock.js new file mode 100644 index 0000000..f0e218f --- /dev/null +++ b/tests/integration/configMock.js @@ -0,0 +1,42 @@ +const defaultMockValues = { + IS_TEST_ENV: true, + BYPASS_SSL_VALIDATION: false, + USE_TLS: false, + TLS_KEY_PATH: null, + TLS_CERT_PATH: null, + STORAGE_STRATEGY: 'lru', + REDIS_URL: null, + FORCE_CLOSE_TIMEOUT: 60 * 1000, + METRICS_TOKEN: null, + JWT_SECRET_KEY: null, + BACKUP_DIR: './backup', + ROOM_CLEANUP_INTERVAL: 1000, + LOCK_TIMEOUT: 1000, + LOCK_RETRY_INTERVAL: 1000, + MAX_BACKUPS_PER_ROOM: 10, + ROOM_MAX_AGE: 1000, + MAX_ROOMS_IN_STORAGE: 1000, +} + +export function createConfigMock(customValues = {}) { + const mockValues = { ...defaultMockValues, ...customValues } + + const computedProperties = { + get JWT_SECRET_KEY() { + return mockValues.JWT_SECRET_KEY + }, + get NEXTCLOUD_WEBSOCKET_URL() { + return mockValues.NEXTCLOUD_WEBSOCKET_URL + }, + get NEXTCLOUD_URL() { + return mockValues.NEXTCLOUD_URL + }, + } + + const mockConfig = { + ...mockValues, + ...computedProperties, + } + + return mockConfig +} diff --git a/tests/integration/metrics.spec.mjs b/tests/integration/metrics.spec.mjs index ef18cea..6fe37c9 100644 --- a/tests/integration/metrics.spec.mjs +++ b/tests/integration/metrics.spec.mjs @@ -1,32 +1,37 @@ import { beforeAll, afterAll, describe, it, expect, vi } from 'vitest' import axios from 'axios' -import ServerManager from '../../websocket_server/ServerManager.js' +import { createConfigMock } from './configMock.js' +import ServerManagerModule from '../../websocket_server/ServerManager.js' +import ConfigModule from '../../websocket_server/Config.js' -const SERVER_URL = 'http://localhost:3008' -const SECRET = 'secret' +vi.mock('../../websocket_server/Config.js', () => ({ + default: createConfigMock({ + NEXTCLOUD_URL: 'http://localhost:3008', + NEXTCLOUD_WEBSOCKET_URL: 'http://localhost:3008', + PORT: '3008', + METRICS_TOKEN: 'secret', + }), +})) -vi.stubEnv('METRICS_TOKEN', SECRET) +const Config = ConfigModule +const ServerManager = ServerManagerModule describe('Metrics endpoint', () => { let serverManager beforeAll(async () => { - serverManager = new ServerManager({ - port: 3008, - storageStrategy: 'lru', - }) - - serverManager.start() + serverManager = new ServerManager() + await serverManager.start() }) afterAll(async () => { - await serverManager.server.close() + await serverManager.gracefulShutdown() }) it('should work with bearer auth', async () => { - const response = await axios.get(`${SERVER_URL}/metrics`, { + const response = await axios.get(`${Config.NEXTCLOUD_URL}/metrics`, { headers: { - Authorization: `Bearer ${SECRET}`, + Authorization: `Bearer ${Config.METRICS_TOKEN}`, }, }) expect(response.status).toBe(200) @@ -39,14 +44,14 @@ describe('Metrics endpoint', () => { }) it('should work with token param', async () => { - const response = await axios.get(`${SERVER_URL}/metrics?token=${SECRET}`) + const response = await axios.get(`${Config.NEXTCLOUD_URL}/metrics?token=${Config.METRICS_TOKEN}`) expect(response.status).toBe(200) expect(response.data).toContain('whiteboard_room_stats{stat="activeRooms"}') }) it('Not return on invalid auth', async () => { try { - await axios.get(`${SERVER_URL}/metrics`, { + await axios.get(`${Config.NEXTCLOUD_URL}/metrics`, { headers: { Authorization: 'Bearer wrongtoken', }, diff --git a/tests/integration/socket.spec.mjs b/tests/integration/socket.spec.mjs index 292f9cc..2d87ee6 100644 --- a/tests/integration/socket.spec.mjs +++ b/tests/integration/socket.spec.mjs @@ -1,13 +1,23 @@ import { beforeAll, afterAll, describe, it, expect, vi } from 'vitest' -import ServerManager from '../../websocket_server/ServerManager.js' -import io from 'socket.io-client' +import { io } from 'socket.io-client' import jwt from 'jsonwebtoken' -import Utils from '../../websocket_server/Utils.js' +import { createConfigMock } from './configMock.js' +import ServerManagerModule from '../../websocket_server/ServerManager.js' +import UtilsModule from '../../websocket_server/Utils.js' +import ConfigModule from '../../websocket_server/Config.js' -const SERVER_URL = 'http://localhost:3009' -const SECRET = 'secret' +vi.mock('../../websocket_server/Config.js', () => ({ + default: createConfigMock({ + NEXTCLOUD_URL: 'http://localhost:3009', + NEXTCLOUD_WEBSOCKET_URL: 'http://localhost:3009', + PORT: '3009', + JWT_SECRET_KEY: 'secret', + }), +})) -vi.stubEnv('JWT_SECRET_KEY', SECRET) +const Config = ConfigModule +const ServerManager = ServerManagerModule +const Utils = UtilsModule function waitFor(socket, event) { return new Promise((resolve) => { @@ -19,16 +29,12 @@ describe('Socket handling', () => { let serverManager, socket beforeAll(async () => { - serverManager = new ServerManager({ - port: 3009, - storageStrategy: 'lru', - }) - - serverManager.start() + serverManager = new ServerManager() + await serverManager.start() - socket = io(SERVER_URL, { + socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, Config.JWT_SECRET_KEY), }, }) @@ -39,11 +45,11 @@ describe('Socket handling', () => { afterAll(async () => { await socket.disconnect() - await serverManager.server.close() + await serverManager.gracefulShutdown() }) it('socket invalid jwt', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, 'wrongsecret'), }, @@ -56,9 +62,9 @@ describe('Socket handling', () => { }) it('socket valid jwt', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' } }, Config.JWT_SECRET_KEY), }, }) return new Promise((resolve) => { @@ -78,9 +84,9 @@ describe('Socket handling', () => { }) it('read only socket', async () => { - const socket = io(SERVER_URL, { + const socket = io(Config.NEXTCLOUD_WEBSOCKET_URL, { auth: { - token: jwt.sign({ roomID: 123, user: { name: 'Admin' }, isFileReadOnly: true }, SECRET), + token: jwt.sign({ roomID: 123, user: { name: 'Admin' }, isFileReadOnly: true }, Config.JWT_SECRET_KEY), }, }) return new Promise((resolve) => { diff --git a/vitest.config.js b/vitest.config.js index ca49f62..dc5f510 100644 --- a/vitest.config.js +++ b/vitest.config.js @@ -6,10 +6,10 @@ import { defineConfig } from 'vitest/config' export default defineConfig({ - test: { - environment: 'node', - include: [ - 'tests/integration/*.spec.?(c|m)[jt]s?(x)' - ], - }, - }) + test: { + environment: 'node', + include: [ + 'tests/integration/*.spec.?(c|m)[jt]s?(x)' + ], + }, +}) diff --git a/websocket_server/ApiService.js b/websocket_server/ApiService.js index 383e781..1587bb4 100644 --- a/websocket_server/ApiService.js +++ b/websocket_server/ApiService.js @@ -7,16 +7,12 @@ import fetch from 'node-fetch' import https from 'https' -import dotenv from 'dotenv' -import Utils from './Utils.js' -dotenv.config() +import Config from './Config.js' export default class ApiService { constructor(tokenGenerator) { - this.NEXTCLOUD_URL = process.env.NEXTCLOUD_URL - this.IS_DEV = Utils.parseBooleanFromEnv(process.env.IS_DEV) - this.agent = this.IS_DEV ? new https.Agent({ rejectUnauthorized: false }) : null + this.agent = (Config.USE_TLS && Config.BYPASS_SSL_VALIDATION) ? new https.Agent({ rejectUnauthorized: false }) : null this.tokenGenerator = tokenGenerator } @@ -50,7 +46,7 @@ export default class ApiService { } async getRoomDataFromServer(roomID, jwtToken) { - const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` + const url = `${Config.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` const options = this.fetchOptions('GET', jwtToken) return this.fetchData(url, options) } @@ -58,7 +54,7 @@ export default class ApiService { async saveRoomDataToServer(roomID, roomData, lastEditedUser, files) { console.log(`[${roomID}] Saving room data to server: ${roomData.length} elements, ${Object.keys(files).length} files`) - const url = `${this.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` + const url = `${Config.NEXTCLOUD_URL}/index.php/apps/whiteboard/${roomID}` const body = { data: { diff --git a/websocket_server/AppManager.js b/websocket_server/AppManager.js index e9e95f3..45d669d 100644 --- a/websocket_server/AppManager.js +++ b/websocket_server/AppManager.js @@ -3,19 +3,14 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import dotenv from 'dotenv' import express from 'express' -import PrometheusDataManager from './PrometheusDataManager.js' - -dotenv.config() +import Config from './Config.js' export default class AppManager { - constructor(storageManager) { + constructor(metricsManager) { this.app = express() - this.storageManager = storageManager - this.metricsManager = new PrometheusDataManager(storageManager) - this.METRICS_TOKEN = process.env.METRICS_TOKEN + this.metricsManager = metricsManager this.setupRoutes() } @@ -30,7 +25,7 @@ export default class AppManager { async metricsHandler(req, res) { const token = req.headers.authorization?.split(' ')[1] || req.query.token - if (!this.METRICS_TOKEN || token !== this.METRICS_TOKEN) { + if (!Config.METRICS_TOKEN || token !== Config.METRICS_TOKEN) { return res.status(403).send('Unauthorized') } this.metricsManager.updateMetrics() diff --git a/websocket_server/BackupManager.js b/websocket_server/BackupManager.js index 95ffb98..5695e3c 100644 --- a/websocket_server/BackupManager.js +++ b/websocket_server/BackupManager.js @@ -10,18 +10,11 @@ import path from 'path' import crypto from 'crypto' import zlib from 'zlib' import { promisify } from 'util' +import Config from './Config.js' const gzip = promisify(zlib.gzip) const gunzip = promisify(zlib.gunzip) -/** - * @typedef {object} BackupOptions - * @property {string} [backupDir='./backup'] - Directory to store backups - * @property {number} [maxBackupsPerRoom=5] - Maximum number of backups to keep per room - * @property {number} [lockTimeout=5000] - Maximum time in ms to wait for a lock - * @property {number} [lockRetryInterval=50] - Time in ms between lock retry attempts - */ - /** * @typedef {object} BackupData * @property {string} id - Unique identifier for the backup @@ -39,15 +32,9 @@ export default class BackupManager { /** * Creates a new BackupManager instance - * @param {BackupOptions} [options] - Configuration options */ - constructor(options = {}) { - const { backupDir = './backup', maxBackupsPerRoom = 5 } = options - this.backupDir = backupDir - this.maxBackupsPerRoom = maxBackupsPerRoom + constructor() { this.locks = new Map() - this.lockTimeout = options.lockTimeout || 5000 // 5 seconds - this.lockRetryInterval = options.lockRetryInterval || 50 // 50ms this.init() } @@ -57,7 +44,7 @@ export default class BackupManager { */ async init() { try { - await fs.mkdir(this.backupDir, { recursive: true }) + await fs.mkdir(Config.BACKUP_DIR, { recursive: true }) await this.cleanupTemporaryFiles() } catch (error) { console.error('Failed to initialize BackupManager:', error) @@ -70,12 +57,12 @@ export default class BackupManager { */ async cleanupTemporaryFiles() { try { - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const tmpFiles = files.filter((f) => f.endsWith('.tmp')) await Promise.all( tmpFiles.map((file) => fs - .unlink(path.join(this.backupDir, file)) + .unlink(path.join(Config.BACKUP_DIR, file)) .catch(console.error), ), ) @@ -92,11 +79,11 @@ export default class BackupManager { async acquireLock(roomId) { const startTime = Date.now() while (this.locks.get(roomId)) { - if (Date.now() - startTime > this.lockTimeout) { + if (Date.now() - startTime > Config.LOCK_TIMEOUT) { throw new Error(`Lock acquisition timeout for room ${roomId}`) } await new Promise((resolve) => - setTimeout(resolve, this.lockRetryInterval), + setTimeout(resolve, Config.LOCK_RETRY_INTERVAL), ) } this.locks.set(roomId, Date.now()) @@ -187,7 +174,7 @@ export default class BackupManager { */ async writeBackupFile(roomId, backupData) { const backupFile = path.join( - this.backupDir, + Config.BACKUP_DIR, `${roomId}_${backupData.timestamp}.bak`, ) const tempFile = `${backupFile}.tmp` @@ -205,7 +192,7 @@ export default class BackupManager { */ async getLatestBackup(roomId) { const sanitizedRoomId = this.sanitizeRoomId(roomId) - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const roomBackups = files .filter( (f) => @@ -218,7 +205,7 @@ export default class BackupManager { try { const compressed = await fs.readFile( - path.join(this.backupDir, roomBackups[0]), + path.join(Config.BACKUP_DIR, roomBackups[0]), ) const decompressed = await gunzip(compressed) const backup = JSON.parse(decompressed.toString()) @@ -246,7 +233,7 @@ export default class BackupManager { const sanitizedRoomId = this.sanitizeRoomId(roomId) try { - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) const roomBackups = files .filter( (f) => @@ -256,15 +243,15 @@ export default class BackupManager { .sort() .reverse() - if (roomBackups.length <= this.maxBackupsPerRoom) { + if (roomBackups.length <= Config.MAX_BACKUPS_PER_ROOM) { return } - const filesToDelete = roomBackups.slice(this.maxBackupsPerRoom) + const filesToDelete = roomBackups.slice(Config.MAX_BACKUPS_PER_ROOM) await Promise.all( filesToDelete.map((file) => fs - .unlink(path.join(this.backupDir, file)) + .unlink(path.join(Config.BACKUP_DIR, file)) .catch((error) => { console.error( `Failed to delete backup ${file}:`, @@ -285,7 +272,7 @@ export default class BackupManager { */ async getAllBackups(roomId) { const sanitizedRoomId = this.sanitizeRoomId(roomId) - const files = await fs.readdir(this.backupDir) + const files = await fs.readdir(Config.BACKUP_DIR) return files .filter( (f) => diff --git a/websocket_server/Config.js b/websocket_server/Config.js new file mode 100644 index 0000000..cfa0f18 --- /dev/null +++ b/websocket_server/Config.js @@ -0,0 +1,86 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +/* eslint-disable no-console */ + +import dotenv from 'dotenv' +import crypto from 'crypto' +import { + DEFAULT_NEXTCLOUD_URL, + DEFAULT_PORT, + DEFAULT_STORAGE_STRATEGY, + DEFAULT_FORCE_CLOSE_TIMEOUT, + DEFAULT_REDIS_URL, + DEFAULT_ROOM_CLEANUP_INTERVAL, + DEFAULT_LOCK_TIMEOUT, + DEFAULT_LOCK_RETRY_INTERVAL, + DEFAULT_MAX_BACKUPS_PER_ROOM, + DEFAULT_BACKUP_DIR, + DEFAULT_ROOM_MAX_AGE, + DEFAULT_MAX_ROOMS_IN_STORAGE, + DEFAULT_CACHED_TOKEN_TTL, +} from './Constants.js' +import Utils from './Utils.js' + +dotenv.config() + +const Config = { + IS_TEST_ENV: process.env.NODE_ENV === 'test', + + BYPASS_SSL_VALIDATION: Utils.parseBooleanFromEnv(process.env.BYPASS_SSL_VALIDATION), + + PORT: process.env.PORT || DEFAULT_PORT, + + USE_TLS: Utils.parseBooleanFromEnv(process.env.TLS), + + TLS_KEY_PATH: process.env.TLS_KEY || null, + + TLS_CERT_PATH: process.env.TLS_CERT || null, + + STORAGE_STRATEGY: process.env.STORAGE_STRATEGY || DEFAULT_STORAGE_STRATEGY, + + REDIS_URL: process.env.REDIS_URL || DEFAULT_REDIS_URL, + + FORCE_CLOSE_TIMEOUT: process.env.FORCE_CLOSE_TIMEOUT || DEFAULT_FORCE_CLOSE_TIMEOUT, + + METRICS_TOKEN: process.env.METRICS_TOKEN || null, + + BACKUP_DIR: process.env.BACKUP_DIR || DEFAULT_BACKUP_DIR, + + MAX_BACKUPS_PER_ROOM: process.env.MAX_BACKUPS_PER_ROOM || DEFAULT_MAX_BACKUPS_PER_ROOM, + + LOCK_TIMEOUT: process.env.LOCK_TIMEOUT || DEFAULT_LOCK_TIMEOUT, + + LOCK_RETRY_INTERVAL: process.env.LOCK_RETRY_INTERVAL || DEFAULT_LOCK_RETRY_INTERVAL, + + ROOM_CLEANUP_INTERVAL: process.env.ROOM_CLEANUP_INTERVAL || DEFAULT_ROOM_CLEANUP_INTERVAL, + + ROOM_MAX_AGE: process.env.ROOM_MAX_AGE || DEFAULT_ROOM_MAX_AGE, + + MAX_ROOMS_IN_STORAGE: process.env.MAX_ROOMS_IN_STORAGE || DEFAULT_MAX_ROOMS_IN_STORAGE, + + CACHED_TOKEN_TTL: process.env.CACHED_TOKEN_TTL || DEFAULT_CACHED_TOKEN_TTL, + + get JWT_SECRET_KEY() { + if (!process.env.JWT_SECRET_KEY) { + const newSecret = crypto.randomBytes(32).toString('hex') + process.env.JWT_SECRET_KEY = newSecret + console.log('Generated new JWT_SECRET_KEY:', newSecret) + } else { + console.log('Using existing JWT_SECRET_KEY from environment') + } + return process.env.JWT_SECRET_KEY + }, + + get NEXTCLOUD_WEBSOCKET_URL() { + return Utils.getOriginFromUrl(process.env.NEXTCLOUD_URL || DEFAULT_NEXTCLOUD_URL) + }, + + get NEXTCLOUD_URL() { + return this.NEXTCLOUD_WEBSOCKET_URL + }, +} + +export default Config diff --git a/websocket_server/Constants.js b/websocket_server/Constants.js new file mode 100644 index 0000000..d0f1012 --- /dev/null +++ b/websocket_server/Constants.js @@ -0,0 +1,35 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +export const DEFAULT_NEXTCLOUD_URL = 'http://nextcloud.local' + +export const DEFAULT_PORT = 3002 + +export const DEFAULT_STORAGE_STRATEGY = 'lru' + +export const DEFAULT_FORCE_CLOSE_TIMEOUT = 60 * 60 * 1000 + +export const DEFAULT_REDIS_URL = 'redis://localhost:6379' + +export const DEFAULT_BACKUP_DIR = './backup' + +export const DEFAULT_MAX_BACKUPS_PER_ROOM = 5 + +export const DEFAULT_LOCK_TIMEOUT = 5000 + +export const DEFAULT_LOCK_RETRY_INTERVAL = 50 + +export const DEFAULT_ROOM_CLEANUP_INTERVAL = 5 * 60 * 1000 + +export const DEFAULT_ROOM_MAX_AGE = 30 * 60 * 1000 + +export const DEFAULT_CACHED_TOKEN_TTL = 10 * 60 * 1000 + +export const DEFAULT_MAX_ROOMS_IN_STORAGE = 1000 + +export const DEFAULT_EMPTY_ROOM_DATA = Object.freeze({ + elements: [], + files: {}, +}) diff --git a/websocket_server/GeneralLRUStrategy.js b/websocket_server/GeneralLRUStrategy.js new file mode 100644 index 0000000..be34684 --- /dev/null +++ b/websocket_server/GeneralLRUStrategy.js @@ -0,0 +1,43 @@ +/* eslint-disable no-console */ + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import { LRUCache } from 'lru-cache' +import StorageStrategy from './StorageStrategy.js' + +export default class GeneralLRUStrategy extends StorageStrategy { + + constructor(options = {}) { + const { max = 1000, ttl = 1000 * 60 * 60 * 24, ttlAutopurge = true } = options + super() + this.cache = new LRUCache({ + max, + ttl, + ttlAutopurge, + }) + } + + async get(key) { + return this.cache.get(key) + } + + async set(key, value) { + this.cache.set(key, value) + } + + async delete(key) { + this.cache.delete(key) + } + + async clear() { + this.cache.clear() + } + + getRooms() { + throw new Error('Method not implemented.') + } + +} diff --git a/websocket_server/GeneralRedisStrategy.js b/websocket_server/GeneralRedisStrategy.js new file mode 100644 index 0000000..37e8514 --- /dev/null +++ b/websocket_server/GeneralRedisStrategy.js @@ -0,0 +1,69 @@ +/* eslint-disable no-console */ + +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import StorageStrategy from './StorageStrategy.js' + +export default class GeneralRedisStrategy extends StorageStrategy { + + constructor(redisClient, options = {}) { + const { prefix = 'general_', ttl = null } = options + super() + this.prefix = prefix + this.ttl = ttl + this.client = redisClient + } + + async get(key) { + try { + const data = await this.client.get(`${this.prefix}${key}`) + if (!data) return null + return JSON.parse(data) + } catch (error) { + console.error(`Error getting data for key ${key}:`, error) + return null + } + } + + async set(key, value) { + try { + const serializedData = JSON.stringify(value) + if (this.ttl) { + await this.client.set(`${this.prefix}${key}`, serializedData, { + EX: this.ttl, + }) + } else { + await this.client.set(`${this.prefix}${key}`, serializedData) + } + } catch (error) { + console.error(`Error setting data for key ${key}:`, error) + } + } + + async delete(key) { + try { + await this.client.del(`${this.prefix}${key}`) + } catch (error) { + console.error(`Error deleting key ${key}:`, error) + } + } + + async clear() { + try { + const keys = await this.client.keys(`${this.prefix}*`) + if (keys.length > 0) { + await this.client.del(keys) + } + } catch (error) { + console.error('Error clearing general data:', error) + } + } + + getRooms() { + throw new Error('Method not implemented.') + } + +} diff --git a/websocket_server/InMemoryStrategy.js b/websocket_server/InMemoryStrategy.js new file mode 100644 index 0000000..955d11c --- /dev/null +++ b/websocket_server/InMemoryStrategy.js @@ -0,0 +1,35 @@ +/** + * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors + * SPDX-License-Identifier: AGPL-3.0-or-later + */ + +import StorageStrategy from './StorageStrategy.js' + +export default class InMemoryStrategy extends StorageStrategy { + + constructor() { + super() + this.store = new Map() + } + + async get(key) { + return this.store.get(key) + } + + async set(key, value) { + this.store.set(key, value) + } + + async delete(key) { + this.store.delete(key) + } + + async clear() { + this.store.clear() + } + + getRooms() { + throw new Error('Method not implemented.') + } + +} diff --git a/websocket_server/LRUCacheStrategy.js b/websocket_server/LRUCacheStrategy.js index 73d6610..66249d5 100644 --- a/websocket_server/LRUCacheStrategy.js +++ b/websocket_server/LRUCacheStrategy.js @@ -8,6 +8,7 @@ import StorageStrategy from './StorageStrategy.js' import { LRUCache } from 'lru-cache' import Room from './Room.js' +import Config from './Config.js' export default class LRUCacheStrategy extends StorageStrategy { @@ -15,8 +16,8 @@ export default class LRUCacheStrategy extends StorageStrategy { super() this.apiService = apiService this.cache = new LRUCache({ - max: 1000, - ttl: 30 * 60 * 1000, + max: Config.MAX_ROOMS_IN_STORAGE, + ttl: Config.ROOM_MAX_AGE, ttlAutopurge: true, dispose: async (value, key) => { console.log(`[${key}] Disposing room`) diff --git a/websocket_server/PrometheusDataManager.js b/websocket_server/PrometheusDataManager.js index 1671074..7dffb82 100644 --- a/websocket_server/PrometheusDataManager.js +++ b/websocket_server/PrometheusDataManager.js @@ -4,12 +4,11 @@ */ import { register, Gauge } from 'prom-client' -import SystemMonitor from './SystemMonitor.js' export default class PrometheusDataManager { - constructor(storageManager) { - this.systemMonitor = new SystemMonitor(storageManager) + constructor(systemMonitor) { + this.systemMonitor = systemMonitor this.initializeGauges() } diff --git a/websocket_server/RedisStrategy.js b/websocket_server/RedisStrategy.js index d7c37cf..27264e1 100644 --- a/websocket_server/RedisStrategy.js +++ b/websocket_server/RedisStrategy.js @@ -8,16 +8,15 @@ import StorageStrategy from './StorageStrategy.js' import { createClient } from 'redis' import Room from './Room.js' +import Config from './Config.js' export default class RedisStrategy extends StorageStrategy { - constructor(apiService) { - super() - this.apiService = apiService - this.client = createClient({ - url: process.env.REDIS_URL || 'redis://localhost:6379', + static createRedisClient() { + return createClient({ + url: Config.REDIS_URL, retry_strategy: (options) => { - if (options.error && options.error.code === 'ECONNREFUSED') { + if (options.error?.code === 'ECONNREFUSED') { return new Error('The server refused the connection') } if (options.total_retry_time > 1000 * 60 * 60) { @@ -29,26 +28,18 @@ export default class RedisStrategy extends StorageStrategy { return Math.min(options.attempt * 100, 3000) }, }) - this.client.on('error', (err) => - console.error('Redis Client Error', err), - ) - this.connect() } - async connect() { - try { - await this.client.connect() - } catch (error) { - console.error('Failed to connect to Redis:', error) - throw error - } + constructor(redisClient, apiService) { + super() + this.apiService = apiService + this.client = redisClient } async get(key) { try { const data = await this.client.get(key) - if (!data) return null - return this.deserialize(data) + return data ? this.deserialize(data) : null } catch (error) { console.error(`Error getting data for key ${key}:`, error) return null @@ -58,7 +49,9 @@ export default class RedisStrategy extends StorageStrategy { async set(key, value) { try { const serializedData = this.serialize(value) - await this.client.set(key, serializedData, { EX: 30 * 60 }) + await this.client.set(key, serializedData, { + EX: Config.ROOM_MAX_AGE / 1000, + }) } catch (error) { console.error(`Error setting data for key ${key}:`, error) } @@ -83,7 +76,10 @@ export default class RedisStrategy extends StorageStrategy { async clear() { try { - await this.client.flushDb() + const rooms = await this.getRooms() + for (const [key] of rooms) { + await this.delete(key) + } } catch (error) { console.error('Error clearing Redis database:', error) } @@ -93,9 +89,15 @@ export default class RedisStrategy extends StorageStrategy { try { const keys = await this.client.keys('*') const rooms = new Map() + for (const key of keys) { + if (key.startsWith('token_') || key.startsWith('socket_')) { + continue + } const room = await this.get(key) - if (room && !key.startsWith('token:') && !key.startsWith('socket:')) rooms.set(key, room) + if (room) { + rooms.set(key, room) + } } return rooms } catch (error) { @@ -105,9 +107,9 @@ export default class RedisStrategy extends StorageStrategy { } serialize(value) { - return value instanceof Room - ? JSON.stringify(value.toJSON()) - : JSON.stringify(value) + return JSON.stringify( + value instanceof Room ? value.toJSON() : value, + ) } deserialize(data) { diff --git a/websocket_server/RoomDataManager.js b/websocket_server/RoomDataManager.js index 96bbf99..35039bd 100644 --- a/websocket_server/RoomDataManager.js +++ b/websocket_server/RoomDataManager.js @@ -8,6 +8,7 @@ import Utils from './Utils.js' import ApiService from './ApiService.js' import BackupManager from './BackupManager.js' import StorageManager from './StorageManager.js' +import { DEFAULT_EMPTY_ROOM_DATA } from './Constants.js' /** * @typedef {object} RoomData @@ -30,18 +31,6 @@ import StorageManager from './StorageManager.js' */ export default class RoomDataManager { - /** - * Default configuration for room data - * @static - * @readonly - */ - static CONFIG = Object.freeze({ - defaultData: { - elements: [], - files: {}, - }, - }) - /** * @param {StorageManager} storageManager - Manager for room storage operations * @param {ApiService} apiService - Service for API communications @@ -133,7 +122,7 @@ export default class RoomDataManager { normalizeRoomData(data) { // Always return default data structure if input is null/undefined if (!data) { - return RoomDataManager.CONFIG.defaultData + return DEFAULT_EMPTY_ROOM_DATA } const normalized = { @@ -299,24 +288,4 @@ export default class RoomDataManager { return backupData } - /** - * Handles empty room cleanup - * @param {string} roomId - Room identifier - * @return {Promise} - */ - async handleEmptyRoom(roomId) { - await this.cleanupEmptyRoom(roomId) - return null - } - - /** - * Removes empty room from storage - * @param {string} roomId - Room identifier - * @return {Promise} - */ - async cleanupEmptyRoom(roomId) { - await this.storageManager.delete(roomId) - Utils.logOperation(roomId, 'Empty room removed from cache') - } - } diff --git a/websocket_server/ServerManager.js b/websocket_server/ServerManager.js index 20c008f..e61c687 100644 --- a/websocket_server/ServerManager.js +++ b/websocket_server/ServerManager.js @@ -15,22 +15,67 @@ import StorageManager from './StorageManager.js' import RoomDataManager from './RoomDataManager.js' import AppManager from './AppManager.js' import SocketManager from './SocketManager.js' -import Utils from './Utils.js' import BackupManager from './BackupManager.js' +import PrometheusDataManager from './PrometheusDataManager.js' +import SystemMonitor from './SystemMonitor.js' +import Config from './Config.js' +import RedisStrategy from './RedisStrategy.js' export default class ServerManager { - constructor(config) { - this.config = config + constructor() { this.closing = false + this.tokenGenerator = new SharedTokenGenerator() + this.apiService = new ApiService(this.tokenGenerator) - this.backupManager = new BackupManager({}) - this.storageManager = StorageManager.create(this.config.storageStrategy, this.apiService) - this.roomDataManager = new RoomDataManager(this.storageManager, this.apiService, this.backupManager) - this.appManager = new AppManager(this.storageManager) + + this.backupManager = new BackupManager() + + this.redisClient = Config.STORAGE_STRATEGY === 'redis' + ? RedisStrategy.createRedisClient() + : null + + if (this.redisClient) { + this.redisClient.connect().catch(error => { + console.error('Failed to connect to Redis:', error) + throw error + }) + } + + this.roomStorage = StorageManager.create( + Config.STORAGE_STRATEGY, + this.redisClient, + this.apiService, + null, + ) + + this.roomDataManager = new RoomDataManager(this.roomStorage, this.apiService, this.backupManager) + + this.systemMonitor = new SystemMonitor(this.roomStorage) + + this.metricsManager = new PrometheusDataManager(this.systemMonitor) + + this.appManager = new AppManager(this.metricsManager) + this.server = this.createConfiguredServer(this.appManager.getApp()) - this.socketManager = new SocketManager(this.server, this.roomDataManager, this.storageManager) + + this.socketDataStorage = Config.STORAGE_STRATEGY === 'redis' + ? StorageManager.create('general-redis', this.redisClient, null, { prefix: 'socket_' }) + : StorageManager.create('in-mem') + + this.cachedTokenStorage = Config.STORAGE_STRATEGY === 'redis' + ? StorageManager.create('general-redis', this.redisClient, null, { prefix: 'token_', ttl: Config.CACHED_TOKEN_TTL / 1000 }) + : StorageManager.create('general-lru', null, null, { ttl: Config.CACHED_TOKEN_TTL }) + + this.socketManager = new SocketManager( + this.server, + this.roomDataManager, + this.roomStorage, + this.socketDataStorage, + this.cachedTokenStorage, + this.redisClient, + ) } readTlsCredentials(keyPath, certPath) { @@ -41,17 +86,16 @@ export default class ServerManager { } createConfiguredServer(app) { - const useTls = Utils.parseBooleanFromEnv(this.config.tls) - const serverType = useTls ? https : http - const serverOptions = useTls ? this.readTlsCredentials(this.config.keyPath, this.config.certPath) : {} + const serverType = Config.USE_TLS ? https : http + const serverOptions = Config.USE_TLS ? this.readTlsCredentials(Config.TLS_KEY_PATH, Config.TLS_CERT_PATH) : {} return serverType.createServer(serverOptions, app) } start() { return new Promise((resolve, reject) => { - this.server.listen(this.config.port, () => { - console.log(`Listening on port: ${this.config.port}`) + this.server.listen(Config.PORT, () => { + console.log(`Listening on port: ${Config.PORT}`) resolve() }) @@ -60,31 +104,74 @@ export default class ServerManager { reject(error) }) - process.on('SIGTERM', () => this.gracefulShutdown()) - process.on('SIGINT', () => this.gracefulShutdown()) + const handleShutdown = async (signal) => { + try { + console.log(`Received ${signal} signal`) + await this.gracefulShutdown() + if (!Config.IS_TEST_ENV) { + process.exit(0) + } + } catch (error) { + console.error('Failed to shutdown gracefully:', error) + if (!Config.IS_TEST_ENV) { + process.exit(1) + } + } + } + + process.on('SIGTERM', () => handleShutdown('SIGTERM')) + process.on('SIGINT', () => handleShutdown('SIGINT')) }) } async gracefulShutdown() { - if (this.closing) return + if (this.closing) { + console.log('Shutdown already in progress') + return + } this.closing = true - console.log('Received shutdown signal, saving all data...') - try { - await this.roomDataManager.removeAllRoomData() - this.socketManager.io.close() - console.log('Closing server...') - this.server.close(() => { - console.log('HTTP server closed.') - process.exit(0) - }) + console.log('Starting graceful shutdown...') + + const cleanup = async () => { + await this.socketManager.io.close() + console.log('Stopped accepting new connections') - setTimeout(() => { - console.error('Force closing server after timeout') - process.exit(1) - }, this.config.forceCloseTimeout) + await Promise.all([ + // Storage cleanup + (async () => { + await this.socketDataStorage.clear() + await this.cachedTokenStorage.clear() + await this.roomStorage.clear() + console.log('Storage cleared') + })(), + + // Redis cleanup if needed + this.redisClient && (async () => { + await this.redisClient.quit() + console.log('Redis client closed') + })(), + + // Server cleanup with timeout + new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + reject(new Error('Server close timeout')) + }, Config.FORCE_CLOSE_TIMEOUT) + + this.server.close(() => { + clearTimeout(timeout) + console.log('HTTP server closed') + resolve() + }) + }), + ]) + } + + try { + await cleanup() + console.log('Graceful shutdown completed') } catch (error) { console.error('Error during graceful shutdown:', error) - process.exit(1) + throw error } } diff --git a/websocket_server/SharedTokenGenerator.js b/websocket_server/SharedTokenGenerator.js index b3a63b6..678b162 100644 --- a/websocket_server/SharedTokenGenerator.js +++ b/websocket_server/SharedTokenGenerator.js @@ -4,20 +4,14 @@ */ import crypto from 'crypto' -import dotenv from 'dotenv' - -dotenv.config() +import Config from './Config.js' export default class SharedTokenGenerator { - constructor() { - this.SHARED_SECRET = process.env.JWT_SECRET_KEY - } - handle(roomId) { const timestamp = Date.now() const payload = `${roomId}:${timestamp}` - const hmac = crypto.createHmac('sha256', this.SHARED_SECRET) + const hmac = crypto.createHmac('sha256', Config.JWT_SECRET_KEY) hmac.update(payload) const signature = hmac.digest('hex') return `${payload}:${signature}` diff --git a/websocket_server/SocketDataManager.js b/websocket_server/SocketDataManager.js deleted file mode 100644 index 4b0cad1..0000000 --- a/websocket_server/SocketDataManager.js +++ /dev/null @@ -1,32 +0,0 @@ -/** - * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors - * SPDX-License-Identifier: AGPL-3.0-or-later - */ - -export default class SocketDataManager { - - constructor(storageManager) { - this.storageManager = storageManager - } - - async setCachedToken(token, decodedData) { - await this.storageManager.set(`token:${token}`, decodedData) - } - - async getCachedToken(token) { - return this.storageManager.get(`token:${token}`) - } - - async setSocketData(socketId, data) { - await this.storageManager.set(`socket:${socketId}`, data) - } - - async getSocketData(socketId) { - return this.storageManager.get(`socket:${socketId}`) - } - - async deleteSocketData(socketId) { - await this.storageManager.delete(`socket:${socketId}`) - } - -} diff --git a/websocket_server/SocketManager.js b/websocket_server/SocketManager.js index 7ede85e..6f26787 100644 --- a/websocket_server/SocketManager.js +++ b/websocket_server/SocketManager.js @@ -5,61 +5,88 @@ * SPDX-License-Identifier: AGPL-3.0-or-later */ -import { Server as SocketIO } from 'socket.io' +import { Server as SocketIO, Socket } from 'socket.io' import prometheusMetrics from 'socket.io-prometheus' import jwt from 'jsonwebtoken' -import dotenv from 'dotenv' import Utils from './Utils.js' import { createAdapter } from '@socket.io/redis-streams-adapter' -import SocketDataManager from './SocketDataManager.js' - -dotenv.config() +import RoomDataManager from './RoomDataManager.js' +import StorageManager from './StorageManager.js' +import { Server } from 'http' +import { Server as HttpsServer } from 'https' +import Config from './Config.js' +import StorageStrategy from './StorageStrategy.js' +/** + * Manages WebSocket connections and room interactions + */ export default class SocketManager { - constructor(server, roomDataManager, storageManager) { + /** + * Creates a new SocketManager instance + * @param {Server|HttpsServer} server - HTTP/HTTPS server instance + * @param {RoomDataManager} roomDataManager - Manager for room data + * @param {StorageManager} storageManager - Manager for room data storage + * @param {StorageStrategy} socketDataStorage - Manager for socket data storage + * @param {StorageStrategy} cachedTokenStorage - Manager for cached token storage + * @param {object} redisClient - Shared Redis client + */ + constructor(server, roomDataManager, storageManager, socketDataStorage, cachedTokenStorage, redisClient) { this.roomDataManager = roomDataManager this.storageManager = storageManager - this.socketDataManager = new SocketDataManager(storageManager) + this.socketDataStorage = socketDataStorage + this.cachedTokenStorage = cachedTokenStorage + this.redisClient = redisClient + this.io = this.createSocketServer(server) + this.init() + } - this.io = new SocketIO(server, { + // SERVER SETUP METHODS + /** + * Creates and configures the Socket.IO server + * @param {Server|HttpsServer} server - HTTP/HTTPS server instance + * @return {SocketIO.Server} Configured Socket.IO server instance + */ + createSocketServer(server) { + return new SocketIO(server, { transports: ['websocket', 'polling'], cors: { - origin: process.env.NEXTCLOUD_URL || 'http://nextcloud.local', + origin: Config.NEXTCLOUD_WEBSOCKET_URL, methods: ['GET', 'POST', 'PUT', 'DELETE', 'OPTIONS'], credentials: true, }, }) - - this.init() } + /** + * Initializes the socket server and sets up necessary configurations + * @return {Promise} + */ async init() { + await this.setupAdapter() + this.setupEventHandlers() + } + + /** + * Sets up the appropriate adapter (Redis or in-memory) + * @return {Promise} + */ + async setupAdapter() { if (this.shouldUseRedis()) { await this.setupRedisStreamsAdapter() } else { console.log('Using default in-memory adapter') } - - this.io.use(this.socketAuthenticateHandler.bind(this)) - prometheusMetrics(this.io) - this.io.on('connection', this.handleConnection.bind(this)) - } - - shouldUseRedis() { - return this.storageManager.strategy.constructor.name === 'RedisStrategy' } + /** + * Configures Redis Streams adapter for Socket.IO + * @return {Promise} + */ async setupRedisStreamsAdapter() { console.log('Setting up Redis Streams adapter') try { - const redisClient = this.storageManager.strategy.client - this.io.adapter( - createAdapter(redisClient, { - maxLen: 10000, - }), - ) - + this.io.adapter(createAdapter(this.redisClient, { maxLen: 10000 })) console.log('Redis Streams adapter set up successfully') } catch (error) { console.error('Failed to set up Redis Streams adapter:', error) @@ -67,69 +94,67 @@ export default class SocketManager { } } + /** + * Determines if Redis should be used as the adapter + * @return {boolean} + */ + shouldUseRedis() { + return !!this.redisClient + } + + // AUTHENTICATION METHODS + /** + * Handles socket authentication + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} next - Next middleware function + * @return {Promise} + */ async socketAuthenticateHandler(socket, next) { try { const { token } = socket.handshake.auth if (!token) throw new Error('No token provided') const decodedData = await this.verifyToken(token) - console.log('decodedData', decodedData) - await this.socketDataManager.setSocketData(socket.id, decodedData) + await this.socketDataStorage.set(socket.id, decodedData) if (decodedData.isFileReadOnly) { socket.emit('read-only') } next() } catch (error) { - const { secret } = socket.handshake.auth - - try { - jwt.verify( - secret, - process.env.JWT_SECRET_KEY, - { - algorithm: 'HS256', - }, - ) - next(new Error('Connection verified')) - } catch (e) {} - - next(new Error('Authentication error')) + await this.handleAuthError(socket, next) } } - handleConnection(socket) { - socket.emit('init-room') - socket.on('join-room', (roomID) => this.joinRoomHandler(socket, roomID)) - socket.on('server-broadcast', (roomID, encryptedData, iv) => - this.serverBroadcastHandler(socket, roomID, encryptedData, iv), - ) - socket.on('server-volatile-broadcast', (roomID, encryptedData) => - this.serverVolatileBroadcastHandler(socket, roomID, encryptedData), - ) - socket.on('image-add', (roomID, id, data) => this.imageAddHandler(socket, roomID, id, data)) - socket.on('image-remove', (roomID, id, data) => this.imageRemoveHandler(socket, roomID, id, data)) - socket.on('image-get', (roomID, id, data) => this.imageGetHandler(socket, roomID, id, data)) - socket.on('disconnecting', () => { - const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id) - this.disconnectingHandler(socket, rooms) - }) - socket.on('disconnect', () => this.handleDisconnect(socket)) - } - - async handleDisconnect(socket) { - await this.socketDataManager.deleteSocketData(socket.id) - socket.removeAllListeners() + /** + * Handles authentication errors + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} next - Next middleware function + */ + async handleAuthError(socket, next) { + const { secret } = socket.handshake.auth + try { + jwt.verify(secret, Config.JWT_SECRET_KEY, { algorithm: 'HS256' }) + next(new Error('Connection verified')) + } catch (e) { + next(new Error('Authentication error')) + } } + /** + * Verifies JWT token + * @param {string} token - JWT token to verify + * @return {Promise} Decoded token data + */ async verifyToken(token) { - const cachedToken = await this.socketDataManager.getCachedToken(token) + const cachedToken = await this.cachedTokenStorage.get(token) + console.log('cachedTokenStorage', this.cachedTokenStorage) if (cachedToken) return cachedToken return new Promise((resolve, reject) => { jwt.verify( token, - process.env.JWT_SECRET_KEY, + Config.JWT_SECRET_KEY, async (err, decoded) => { if (err) { console.log( @@ -139,32 +164,71 @@ export default class SocketManager { ) return reject(new Error('Authentication error')) } - await this.socketDataManager.setCachedToken(token, decoded) + await this.cachedTokenStorage.set(token, decoded) resolve(decoded) }, ) }) } - async isSocketReadOnly(socketId) { - const socketData = await this.socketDataManager.getSocketData(socketId) - return socketData ? !!socketData.isFileReadOnly : false + // EVENT SETUP METHODS + /** + * Sets up all event handlers for the socket server + */ + setupEventHandlers() { + this.io.use(this.socketAuthenticateHandler.bind(this)) + prometheusMetrics(this.io) + this.io.on('connection', this.handleConnection.bind(this)) } - async getUserSocketsAndIds(roomID) { - const sockets = await this.io.in(roomID).fetchSockets() - return Promise.all(sockets.map(async (s) => { - const data = await this.socketDataManager.getSocketData(s.id) - return { - socketId: s.id, - user: data.user, - userId: data.user.id, - } - })) + /** + * Handles new socket connections + * @param {Socket} socket - Socket.IO socket instance + */ + handleConnection(socket) { + socket.emit('init-room') + this.setupSocketEventListeners(socket) + } + + /** + * Sets up event listeners for a specific socket + * @param {Socket} socket - Socket.IO socket instance + */ + setupSocketEventListeners(socket) { + const events = { + 'join-room': this.joinRoomHandler, + 'server-broadcast': this.serverBroadcastHandler, + 'server-volatile-broadcast': this.serverVolatileBroadcastHandler, + 'image-add': this.imageAddHandler, + 'image-remove': this.imageRemoveHandler, + 'image-get': this.imageGetHandler, + disconnect: this.disconnectHandler, + } + + // Handle regular events + Object.entries(events).forEach(([event, handler]) => { + socket.on(event, (...args) => + this.safeSocketHandler(socket, () => handler.apply(this, [socket, ...args])), + ) + }) + + // Handle disconnecting separately to ensure correct room capture + socket.on('disconnecting', () => { + const rooms = Array.from(socket.rooms).filter((room) => room !== socket.id) + this.safeSocketHandler(socket, () => this.disconnectingHandler(socket, rooms)) + }) } + // ROOM EVENT HANDLERS + /** + * Handles room join requests + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @return {Promise} + */ async joinRoomHandler(socket, roomID) { - console.log(`[${roomID}] ${socket.id} has joined ${roomID}`) + const socketData = await this.socketDataStorage.get(socket.id) + console.log(`[${roomID}] ${socketData.user.name} has joined ${roomID}`) await socket.join(roomID) const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) @@ -195,6 +259,14 @@ export default class SocketManager { } } + /** + * Handles broadcast messages to room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {ArrayBuffer} encryptedData - Encrypted message data + * @param {string} iv - Initialization vector + * @return {Promise} + */ async serverBroadcastHandler(socket, roomID, encryptedData, iv) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -208,35 +280,19 @@ export default class SocketManager { }, socket.id) } - async processRoomDataUpdate(roomID, updateData, socketId) { - const socketData = await this.socketDataManager.getSocketData(socketId) - if (!socketData) return - - const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) - const currentRoom = await this.storageManager.get(roomID) - - const roomData = { - elements: updateData.elements || currentRoom?.data || [], - files: updateData.files || currentRoom?.files || {}, - } - - await this.roomDataManager.syncRoomData( - roomID, - roomData, - userSocketsAndIds.map(u => u.userId), - socketData.user.id, - ) - } - + /** + * Handles volatile broadcasts (e.g., mouse movements) + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {ArrayBuffer} encryptedData - Encrypted message data + */ async serverVolatileBroadcastHandler(socket, roomID, encryptedData) { const payload = JSON.parse( Utils.convertArrayBufferToString(encryptedData), ) if (payload.type === 'MOUSE_LOCATION') { - const socketData = await this.socketDataManager.getSocketData( - socket.id, - ) + const socketData = await this.socketDataStorage.get(socket.id) if (!socketData) return const eventData = { type: 'MOUSE_LOCATION', @@ -255,6 +311,15 @@ export default class SocketManager { } } + // IMAGE HANDLING METHODS + /** + * Handles image addition to room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {string} id - Image identifier + * @param {object} data - Image data + * @return {Promise} + */ async imageAddHandler(socket, roomID, id, data) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -270,6 +335,12 @@ export default class SocketManager { }, socket.id) } + /** + * Handles image removal from room + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomID - Room identifier + * @param {string} id - Image identifier + */ async imageRemoveHandler(socket, roomID, id) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomID) || isReadOnly) return @@ -286,6 +357,12 @@ export default class SocketManager { }, socket.id) } + /** + * Handles image retrieval requests + * @param {Socket} socket - Socket.IO socket instance + * @param {string} roomId - Room identifier + * @param {string} id - Image identifier + */ async imageGetHandler(socket, roomId, id) { const isReadOnly = await this.isSocketReadOnly(socket.id) if (!socket.rooms.has(roomId) || isReadOnly) return @@ -302,8 +379,37 @@ export default class SocketManager { } } + // DISCONNECTION HANDLERS + /** + * Handles socket disconnection + * @param {Socket} socket - Socket.IO socket instance + */ + async disconnectHandler(socket) { + try { + // Clean up socket data first + await this.socketDataStorage.delete(socket.id) + + // Remove all listeners + socket.removeAllListeners() + + // Force disconnect if still connected + if (socket.connected) { + socket.disconnect(true) + } + + Utils.logOperation('SOCKET', `Cleaned up socket: ${socket.id}`) + } catch (error) { + Utils.logError('SOCKET', `Failed to cleanup socket: ${socket.id}`, error) + } + } + + /** + * Handles socket disconnecting event + * @param {Socket} socket - Socket.IO socket instance + * @param {string[]} rooms - Array of room IDs + */ async disconnectingHandler(socket, rooms) { - const socketData = await this.socketDataManager.getSocketData(socket.id) + const socketData = await this.socketDataStorage.get(socket.id) if (!socketData) return console.log(`[${socketData.fileId}] ${socketData.user.name} has disconnected`) console.log('socket rooms', rooms) @@ -316,17 +422,105 @@ export default class SocketManager { if (otherUserSockets.length > 0) { this.io.to(roomID).emit('room-user-change', otherUserSockets) } else { - this.roomDataManager.cleanupEmptyRoom(roomID) + await this.storageManager.delete(roomID) } this.queueRoomUpdate(roomID, {}, socket.id) } } + // ROOM DATA MANAGEMENT + /** + * Processes room data updates + * @param {string} roomID - Room identifier + * @param {object} updateData - Data to update + * @param {string} socketId - Socket identifier + * @return {Promise} + */ + async processRoomDataUpdate(roomID, updateData, socketId) { + const socketData = await this.socketDataStorage.get(socketId) + if (!socketData) return + + const userSocketsAndIds = await this.getUserSocketsAndIds(roomID) + const currentRoom = await this.storageManager.get(roomID) + + const roomData = { + elements: updateData.elements || currentRoom?.data || [], + files: updateData.files || currentRoom?.files || {}, + } + + await this.roomDataManager.syncRoomData( + roomID, + roomData, + userSocketsAndIds.map(u => u.userId), + socketData.user.id, + ) + } + + /** + * Queues room updates for processing + * @param {string} roomID - Room identifier + * @param {object} updateData - Data to update + * @param {string} socketId - Socket identifier + */ async queueRoomUpdate(roomID, updateData, socketId) { this.processRoomDataUpdate(roomID, updateData, socketId).catch(error => { console.error(`Failed to process room update for ${roomID}:`, error) }) } + // UTILITY METHODS + /** + * Safely executes socket handlers with error handling + * @param {Socket} socket - Socket.IO socket instance + * @param {Function} handler - Handler function to execute + * @return {Promise} Success status + */ + async safeSocketHandler(socket, handler) { + try { + const socketData = await this.socketDataStorage.get(socket.id) + if (!socketData?.user) { + socket.emit('error', 'Invalid session') + socket.disconnect() + return false + } + return await handler() + } catch (error) { + console.error('Socket handler error:', error) + socket.emit('error', 'Internal server error') + return false + } + } + + /** + * Checks if a socket is in read-only mode + * @param {string} socketId - Socket identifier + * @return {Promise} Read-only status + */ + async isSocketReadOnly(socketId) { + const socketData = await this.socketDataStorage.get(socketId) + return socketData ? !!socketData.isFileReadOnly : false + } + + /** + * Gets user sockets and IDs for a room + * @param {string} roomID - Room identifier + * @return {Promise>} + */ + async getUserSocketsAndIds(roomID) { + const sockets = await this.io.in(roomID).fetchSockets() + return Promise.all(sockets.map(async (s) => { + const data = await this.socketDataStorage.get(s.id) + if (!data?.user?.id) { + console.warn(`Invalid socket data for socket ${s.id}`) + return null + } + return { + socketId: s.id, + user: data.user, + userId: data.user.id, + } + })).then(results => results.filter(Boolean)) + } + } diff --git a/websocket_server/StorageManager.js b/websocket_server/StorageManager.js index e58255c..3122676 100644 --- a/websocket_server/StorageManager.js +++ b/websocket_server/StorageManager.js @@ -8,6 +8,9 @@ import StorageStrategy from './StorageStrategy.js' import LRUCacheStrategy from './LRUCacheStrategy.js' import RedisStrategy from './RedisStrategy.js' +import InMemoryStrategy from './InMemoryStrategy.js' +import GeneralLRUStrategy from './GeneralLRUStrategy.js' +import GeneralRedisStrategy from './GeneralRedisStrategy.js' export default class StorageManager { @@ -43,8 +46,7 @@ export default class StorageManager { return this.strategy.getRooms() } - static create(strategyType, apiService) { - + static create(strategyType = 'lru', redisClient = null, apiService = null, options = {}) { let strategy switch (strategyType) { @@ -52,7 +54,16 @@ export default class StorageManager { strategy = new LRUCacheStrategy(apiService) break case 'redis': - strategy = new RedisStrategy(apiService) + strategy = new RedisStrategy(redisClient, apiService) + break + case 'general-lru': + strategy = new GeneralLRUStrategy(options) + break + case 'general-redis': + strategy = new GeneralRedisStrategy(redisClient, options) + break + case 'in-mem': + strategy = new InMemoryStrategy() break default: throw new Error('Invalid storage strategy type') diff --git a/websocket_server/Utils.js b/websocket_server/Utils.js index d230806..b042611 100644 --- a/websocket_server/Utils.js +++ b/websocket_server/Utils.js @@ -19,24 +19,33 @@ export default class Utils { return value === 'true' } + static getOriginFromUrl(url) { + try { + return new URL(url).origin + } catch (error) { + console.error('Invalid URL:', url) + return null + } + } + /** * Logs operation details - * @param {string} roomId - Room identifier + * @param {string} context - Context identifier * @param {string} message - Log message * @param {object} [data] - Additional data to log */ - static logOperation(roomId, message, data = {}) { - console.log(`[${roomId}] ${message}:`, data) + static logOperation(context, message, data = {}) { + console.log(`[${context}] ${message}:`, data) } /** * Logs error details - * @param {string} roomId - Room identifier + * @param {string} context - Context identifier * @param {string} message - Error message * @param {Error} error - Error object */ - static logError(roomId, message, error) { - console.error(`[${roomId}] ${message}:`, error) + static logError(context, message, error) { + console.error(`[${context}] ${message}:`, error) } } diff --git a/websocket_server/main.js b/websocket_server/main.js index 2014c27..01a96e7 100644 --- a/websocket_server/main.js +++ b/websocket_server/main.js @@ -5,36 +5,16 @@ * SPDX-FileCopyrightText: 2024 Nextcloud GmbH and Nextcloud contributors * SPDX-License-Identifier: AGPL-3.0-or-later */ - -import dotenv from 'dotenv' import ServerManager from './ServerManager.js' - -dotenv.config() - -const { - PORT = 3002, - TLS, - TLS_KEY: keyPath, - TLS_CERT: certPath, - STORAGE_STRATEGY = 'lru', -} = process.env - -const FORCE_CLOSE_TIMEOUT = 60 * 1000 +import Config from './Config.js' async function main() { try { - const serverManager = new ServerManager({ - port: PORT, - tls: TLS, - keyPath, - certPath, - storageStrategy: STORAGE_STRATEGY, - forceCloseTimeout: FORCE_CLOSE_TIMEOUT, - }) + const serverManager = new ServerManager() await serverManager.start() - console.log(`Server started successfully on port ${PORT}`) + console.log(`Server started successfully on port ${Config.PORT}`) process.on('SIGTERM', () => serverManager.gracefulShutdown()) process.on('SIGINT', () => serverManager.gracefulShutdown())