From 489d7fec95c51b123a659c0aecd7a85638e4d749 Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Tue, 30 Apr 2024 12:19:54 +0200 Subject: [PATCH 1/6] chore: Gracefully shutdown the embedder --- packages/embedder/EmbeddingsJobQueueStream.ts | 8 ++++++++ packages/embedder/embedder.ts | 11 +++++++---- packages/embedder/logMemoryUse.ts | 2 +- packages/embedder/logPerformance.ts | 2 +- packages/embedder/mergeAsyncIterators.ts | 8 +++++--- packages/embedder/resetStalledJobs.ts | 2 +- 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/packages/embedder/EmbeddingsJobQueueStream.ts b/packages/embedder/EmbeddingsJobQueueStream.ts index 49c3e4c9e58..d3337be0636 100644 --- a/packages/embedder/EmbeddingsJobQueueStream.ts +++ b/packages/embedder/EmbeddingsJobQueueStream.ts @@ -13,10 +13,16 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { } orchestrator: WorkflowOrchestrator + done: boolean + constructor(orchestrator: WorkflowOrchestrator) { this.orchestrator = orchestrator + this.done = false } async next(): Promise> { + if (this.done) { + return {done: true as const, value: undefined} + } const pg = getKysely() const getJob = (isFailed: boolean) => { return pg @@ -57,9 +63,11 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { } } return() { + this.done = true return Promise.resolve({done: true as const, value: undefined}) } throw(error: any) { + this.done = true return Promise.resolve({done: true as const, value: error}) } } diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index e89a7b60a97..2e08370c0ef 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -48,10 +48,11 @@ const run = async () => { const streams = mergeAsyncIterators(jobQueueStreams) const kill: NodeJS.SignalsListener = (signal) => { - Logger.log(`Kill signal received: ${signal}`) - primaryLock?.release() + Logger.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`) + primaryLock?.release().catch(() => {}) + // close streams manually, streams.return only works if any of the stream retruns a datum, if these are idle, it will wait forever + jobQueueStreams.forEach((s) => s.return()) streams.return?.() - process.exit() } process.on('SIGTERM', kill) process.on('SIGINT', kill) @@ -66,7 +67,9 @@ const run = async () => { } // On graceful shutdown - Logger.log('Streaming Complete. Goodbye!') + Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`) + process.exit() } run() + diff --git a/packages/embedder/logMemoryUse.ts b/packages/embedder/logMemoryUse.ts index afe3259aee5..46ae3b03968 100644 --- a/packages/embedder/logMemoryUse.ts +++ b/packages/embedder/logMemoryUse.ts @@ -6,5 +6,5 @@ export const logMemoryUse = () => { const {rss} = memoryUsage const usedMB = Math.floor(rss / MB) console.log('Memory use:', usedMB, 'MB') - }, 10000) + }, 10000).unref() } diff --git a/packages/embedder/logPerformance.ts b/packages/embedder/logPerformance.ts index 96742475db5..af4db92945b 100644 --- a/packages/embedder/logPerformance.ts +++ b/packages/embedder/logPerformance.ts @@ -14,6 +14,6 @@ export const logPerformance = (logEvery: number, resetEvery: number) => { logs = 0 start = performance.now() } - }, logEvery * 1000) + }, logEvery * 1000).unref() return counter } diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index e274e0cca6f..b1f8b38eb56 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -87,10 +87,12 @@ export function mergeAsyncIterators[] | []>( } } catch (err) { // Unwind remaining iterators on failure - try { - await Promise.all(iterators.map((iterator) => iterator.return?.())) - } catch {} + await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) throw err } + finally { + // Unwind remaining iterators on success + await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) + } })() } diff --git a/packages/embedder/resetStalledJobs.ts b/packages/embedder/resetStalledJobs.ts index 592b468faee..7269a3ce519 100644 --- a/packages/embedder/resetStalledJobs.ts +++ b/packages/embedder/resetStalledJobs.ts @@ -14,5 +14,5 @@ export const resetStalledJobs = () => { })) .where('startAt', '<', new Date(Date.now() - ms('5m'))) .execute() - }, ms('5m')) + }, ms('5m')).unref() } From 099e61fae3a6506aad2486f399c5883dce3c9bf7 Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Tue, 30 Apr 2024 12:29:42 +0200 Subject: [PATCH 2/6] Prettier --- packages/embedder/embedder.ts | 5 +++-- packages/embedder/mergeAsyncIterators.ts | 3 +-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index 2e08370c0ef..54d6662ae36 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -48,7 +48,9 @@ const run = async () => { const streams = mergeAsyncIterators(jobQueueStreams) const kill: NodeJS.SignalsListener = (signal) => { - Logger.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`) + Logger.log( + `Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.` + ) primaryLock?.release().catch(() => {}) // close streams manually, streams.return only works if any of the stream retruns a datum, if these are idle, it will wait forever jobQueueStreams.forEach((s) => s.return()) @@ -72,4 +74,3 @@ const run = async () => { } run() - diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index b1f8b38eb56..1565668fe77 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -89,8 +89,7 @@ export function mergeAsyncIterators[] | []>( // Unwind remaining iterators on failure await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) throw err - } - finally { + } finally { // Unwind remaining iterators on success await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) } From c7a75b7e6194c23485fb16a8cfd99347f14c9ba6 Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Tue, 30 Apr 2024 12:59:07 +0200 Subject: [PATCH 3/6] Refactor `mergeAsyncIterators` to properly return --- packages/embedder/embedder.ts | 2 - packages/embedder/mergeAsyncIterators.ts | 128 +++++++++++------------ 2 files changed, 62 insertions(+), 68 deletions(-) diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index 54d6662ae36..8de33d0c9bb 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -52,8 +52,6 @@ const run = async () => { `Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.` ) primaryLock?.release().catch(() => {}) - // close streams manually, streams.return only works if any of the stream retruns a datum, if these are idle, it will wait forever - jobQueueStreams.forEach((s) => s.return()) streams.return?.() } process.on('SIGTERM', kill) diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index 1565668fe77..186074c34b8 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -15,83 +15,79 @@ type Result> = UnYield[] | []>( iterators: T ): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> { - return (async function* () { - type ResultThunk = () => [number, Result] - let count = iterators.length as number - let capability: PromiseCapability | undefined - const queuedResults: ResultThunk[] = [] - const getNext = async (idx: number, iterator: T[number]) => { - try { - const next = await iterator.next() - if (next.done) { - if (--count === 0 && capability !== undefined) { - capability.resolve(null) - } - } else { - resolveResult(() => { - void getNext(idx, iterator) - return [idx, next.value] - }) + type ResultThunk = () => [number, Result] + let count = iterators.length as number + let capability: PromiseCapability | undefined + const queuedResults: ResultThunk[] = [] + const getNext = async (idx: number, iterator: T[number]) => { + try { + const next = await iterator.next() + if (next.done) { + if (--count === 0 && capability !== undefined) { + capability.resolve(null) } - } catch (error) { + } else { resolveResult(() => { - throw error + void getNext(idx, iterator) + return [idx, next.value] }) } + } catch (error) { + resolveResult(() => { + throw error + }) } - const resolveResult = (resultThunk: ResultThunk) => { - if (capability === undefined) { - queuedResults.push(resultThunk) - } else { - capability.resolve(resultThunk) - } + } + const resolveResult = (resultThunk: ResultThunk) => { + if (capability === undefined) { + queuedResults.push(resultThunk) + } else { + capability.resolve(resultThunk) } + } - try { - // Begin all iterators - for (const [idx, iterable] of iterators.entries()) { - void getNext(idx, iterable) + // Begin all iterators + for (const [idx, iterable] of iterators.entries()) { + void getNext(idx, iterable) + } + + const it = { + [Symbol.asyncIterator]: () => it, + next: async () => { + const nextQueuedResult = queuedResults.shift() + if (nextQueuedResult !== undefined) { + return {done: false as const, value: nextQueuedResult()} + } + if (count === 0) { + return {done: true as const, value: undefined} } - // Delegate to iterables as results complete - while (true) { - while (true) { - const nextQueuedResult = queuedResults.shift() - if (nextQueuedResult === undefined) { - break - } else { - yield nextQueuedResult() - } - } - if (count === 0) { - break - } else { - // Promise.withResolvers() is not yet implemented in node - capability = { - resolve: undefined as any, - reject: undefined as any, - promise: undefined as any - } - capability.promise = new Promise((res, rej) => { - capability!.resolve = res - capability!.reject = rej - }) - const nextResult = await capability.promise - if (nextResult === null) { - break - } else { - capability = undefined - yield nextResult() - } - } + // Promise.withResolvers() is not yet implemented in node + capability = { + resolve: undefined as any, + reject: undefined as any, + promise: undefined as any + } + capability.promise = new Promise((res, rej) => { + capability!.resolve = res + capability!.reject = rej + }) + const nextResult = await capability.promise + if (nextResult === null) { + return {done: true as const, value: undefined} + } else { + capability = undefined + return {done: false as const, value: nextResult()} } - } catch (err) { - // Unwind remaining iterators on failure + }, + return: async () => { await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) - throw err - } finally { - // Unwind remaining iterators on success + return {done: true as const, value: undefined} + }, + throw: async (error) => { await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) + return {done: true as const, value: undefined} } - })() + } + return it } From c860c91bcf403212541dc85bf3f33c6adfcd37ce Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Tue, 30 Apr 2024 13:07:44 +0200 Subject: [PATCH 4/6] Type fixes --- packages/embedder/mergeAsyncIterators.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index 186074c34b8..fadb0288724 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -14,7 +14,7 @@ type Result> = UnYield[] | []>( iterators: T -): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> { +) { type ResultThunk = () => [number, Result] let count = iterators.length as number let capability: PromiseCapability | undefined @@ -51,7 +51,7 @@ export function mergeAsyncIterators[] | []>( void getNext(idx, iterable) } - const it = { + const it: AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> = { [Symbol.asyncIterator]: () => it, next: async () => { const nextQueuedResult = queuedResults.shift() @@ -84,7 +84,7 @@ export function mergeAsyncIterators[] | []>( await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) return {done: true as const, value: undefined} }, - throw: async (error) => { + throw: async () => { await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) return {done: true as const, value: undefined} } From fc8b5cf3baa22b6664d26459727a088fc01e0e22 Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Tue, 30 Apr 2024 17:02:07 +0200 Subject: [PATCH 5/6] Shutdown messages for web server and gql executor --- packages/gql-executor/gqlExecutor.ts | 4 +++- packages/server/server.ts | 18 ++++++++++++------ 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/packages/gql-executor/gqlExecutor.ts b/packages/gql-executor/gqlExecutor.ts index a3ce88d0853..7d97a8145b2 100644 --- a/packages/gql-executor/gqlExecutor.ts +++ b/packages/gql-executor/gqlExecutor.ts @@ -30,13 +30,15 @@ const run = async () => { const executorChannel = GQLExecutorChannelId.join(SERVER_ID!) // on shutdown, remove consumer from the group - process.on('SIGTERM', async () => { + process.on('SIGTERM', async (signal) => { + console.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`) await publisher.xgroup( 'DELCONSUMER', ServerChannel.GQL_EXECUTOR_STREAM, ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP, executorChannel ) + console.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`) process.exit() }) diff --git a/packages/server/server.ts b/packages/server/server.ts index 60db5d0a54b..b5c9e316782 100644 --- a/packages/server/server.ts +++ b/packages/server/server.ts @@ -1,6 +1,7 @@ import tracer from 'dd-trace' import {r} from 'rethinkdb-ts' import uws, {SHARED_COMPRESSOR} from 'uWebSockets.js' +import sleep from '../client/utils/sleep' import ICSHandler from './ICSHandler' import PWAHandler from './PWAHandler' import activeClients from './activeClients' @@ -37,14 +38,19 @@ if (!__PRODUCTION__) { }) } -process.on('SIGTERM', () => { +process.on('SIGTERM', async (signal) => { + console.log( + `Server ID: ${process.env.SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.` + ) const RECONNECT_WINDOW = 60_000 // ms - Object.values(activeClients.store).forEach((connectionContext) => { - const disconnectIn = ~~(Math.random() * RECONNECT_WINDOW) - setTimeout(() => { + await Promise.allSettled( + Object.values(activeClients.store).map(async (connectionContext) => { + const disconnectIn = Math.floor(Math.random() * RECONNECT_WINDOW) + await sleep(disconnectIn) handleDisconnect(connectionContext) - }, disconnectIn) - }) + }) + ) + console.log(`Server ID: ${process.env.SERVER_ID}. Graceful shutdown complete, exiting.`) }) const PORT = Number(__PRODUCTION__ ? process.env.PORT : process.env.SOCKET_PORT) From 2e15053c80c67ae944cde915332a51f97797179f Mon Sep 17 00:00:00 2001 From: Georg Bremer Date: Thu, 2 May 2024 09:21:50 +0200 Subject: [PATCH 6/6] Prettier and lint-staged for the embedder --- packages/embedder/lint-staged.config.js | 5 +++++ packages/embedder/mergeAsyncIterators.ts | 4 +--- packages/embedder/package.json | 1 + 3 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 packages/embedder/lint-staged.config.js diff --git a/packages/embedder/lint-staged.config.js b/packages/embedder/lint-staged.config.js new file mode 100644 index 00000000000..e308623dbac --- /dev/null +++ b/packages/embedder/lint-staged.config.js @@ -0,0 +1,5 @@ +module.exports = { + '*.{ts,tsx}': ['eslint --fix', 'prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'], + '*.graphql': ['prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'], + '**/*.{ts,tsx}': () => 'tsc --noEmit -p tsconfig.json' +} diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index fadb0288724..246a19ad4cc 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -12,9 +12,7 @@ type Result> = UnYield[] | []>( - iterators: T -) { +export function mergeAsyncIterators[] | []>(iterators: T) { type ResultThunk = () => [number, Result] let count = iterators.length as number let capability: PromiseCapability | undefined diff --git a/packages/embedder/package.json b/packages/embedder/package.json index 9bb2c2eb416..1026732fbc7 100644 --- a/packages/embedder/package.json +++ b/packages/embedder/package.json @@ -10,6 +10,7 @@ "url": "git+https://github.com/ParabolInc/parabol.git" }, "scripts": { + "precommit": "lint-staged", "lint": "eslint --fix . --ext .ts,.tsx", "lint:check": "eslint . --ext .ts,.tsx", "prettier": "prettier --config ../../.prettierrc --write \"**/*.{ts,tsx}\"",