diff --git a/packages/embedder/EmbeddingsJobQueueStream.ts b/packages/embedder/EmbeddingsJobQueueStream.ts index 49c3e4c9e58..d3337be0636 100644 --- a/packages/embedder/EmbeddingsJobQueueStream.ts +++ b/packages/embedder/EmbeddingsJobQueueStream.ts @@ -13,10 +13,16 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { } orchestrator: WorkflowOrchestrator + done: boolean + constructor(orchestrator: WorkflowOrchestrator) { this.orchestrator = orchestrator + this.done = false } async next(): Promise> { + if (this.done) { + return {done: true as const, value: undefined} + } const pg = getKysely() const getJob = (isFailed: boolean) => { return pg @@ -57,9 +63,11 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator { } } return() { + this.done = true return Promise.resolve({done: true as const, value: undefined}) } throw(error: any) { + this.done = true return Promise.resolve({done: true as const, value: error}) } } diff --git a/packages/embedder/embedder.ts b/packages/embedder/embedder.ts index e89a7b60a97..8de33d0c9bb 100644 --- a/packages/embedder/embedder.ts +++ b/packages/embedder/embedder.ts @@ -48,10 +48,11 @@ const run = async () => { const streams = mergeAsyncIterators(jobQueueStreams) const kill: NodeJS.SignalsListener = (signal) => { - Logger.log(`Kill signal received: ${signal}`) - primaryLock?.release() + Logger.log( + `Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.` + ) + primaryLock?.release().catch(() => {}) streams.return?.() - process.exit() } process.on('SIGTERM', kill) process.on('SIGINT', kill) @@ -66,7 +67,8 @@ const run = async () => { } // On graceful shutdown - Logger.log('Streaming Complete. Goodbye!') + Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`) + process.exit() } run() diff --git a/packages/embedder/lint-staged.config.js b/packages/embedder/lint-staged.config.js new file mode 100644 index 00000000000..e308623dbac --- /dev/null +++ b/packages/embedder/lint-staged.config.js @@ -0,0 +1,5 @@ +module.exports = { + '*.{ts,tsx}': ['eslint --fix', 'prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'], + '*.graphql': ['prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'], + '**/*.{ts,tsx}': () => 'tsc --noEmit -p tsconfig.json' +} diff --git a/packages/embedder/logMemoryUse.ts b/packages/embedder/logMemoryUse.ts index afe3259aee5..46ae3b03968 100644 --- a/packages/embedder/logMemoryUse.ts +++ b/packages/embedder/logMemoryUse.ts @@ -6,5 +6,5 @@ export const logMemoryUse = () => { const {rss} = memoryUsage const usedMB = Math.floor(rss / MB) console.log('Memory use:', usedMB, 'MB') - }, 10000) + }, 10000).unref() } diff --git a/packages/embedder/logPerformance.ts b/packages/embedder/logPerformance.ts index 96742475db5..af4db92945b 100644 --- a/packages/embedder/logPerformance.ts +++ b/packages/embedder/logPerformance.ts @@ -14,6 +14,6 @@ export const logPerformance = (logEvery: number, resetEvery: number) => { logs = 0 start = performance.now() } - }, logEvery * 1000) + }, logEvery * 1000).unref() return counter } diff --git a/packages/embedder/mergeAsyncIterators.ts b/packages/embedder/mergeAsyncIterators.ts index e274e0cca6f..246a19ad4cc 100644 --- a/packages/embedder/mergeAsyncIterators.ts +++ b/packages/embedder/mergeAsyncIterators.ts @@ -12,85 +12,80 @@ type Result> = UnYield[] | []>( - iterators: T -): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> { - return (async function* () { - type ResultThunk = () => [number, Result] - let count = iterators.length as number - let capability: PromiseCapability | undefined - const queuedResults: ResultThunk[] = [] - const getNext = async (idx: number, iterator: T[number]) => { - try { - const next = await iterator.next() - if (next.done) { - if (--count === 0 && capability !== undefined) { - capability.resolve(null) - } - } else { - resolveResult(() => { - void getNext(idx, iterator) - return [idx, next.value] - }) +export function mergeAsyncIterators[] | []>(iterators: T) { + type ResultThunk = () => [number, Result] + let count = iterators.length as number + let capability: PromiseCapability | undefined + const queuedResults: ResultThunk[] = [] + const getNext = async (idx: number, iterator: T[number]) => { + try { + const next = await iterator.next() + if (next.done) { + if (--count === 0 && capability !== undefined) { + capability.resolve(null) } - } catch (error) { + } else { resolveResult(() => { - throw error + void getNext(idx, iterator) + return [idx, next.value] }) } + } catch (error) { + resolveResult(() => { + throw error + }) } - const resolveResult = (resultThunk: ResultThunk) => { - if (capability === undefined) { - queuedResults.push(resultThunk) - } else { - capability.resolve(resultThunk) - } + } + const resolveResult = (resultThunk: ResultThunk) => { + if (capability === undefined) { + queuedResults.push(resultThunk) + } else { + capability.resolve(resultThunk) } + } - try { - // Begin all iterators - for (const [idx, iterable] of iterators.entries()) { - void getNext(idx, iterable) + // Begin all iterators + for (const [idx, iterable] of iterators.entries()) { + void getNext(idx, iterable) + } + + const it: AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result]}[number]> = { + [Symbol.asyncIterator]: () => it, + next: async () => { + const nextQueuedResult = queuedResults.shift() + if (nextQueuedResult !== undefined) { + return {done: false as const, value: nextQueuedResult()} + } + if (count === 0) { + return {done: true as const, value: undefined} } - // Delegate to iterables as results complete - while (true) { - while (true) { - const nextQueuedResult = queuedResults.shift() - if (nextQueuedResult === undefined) { - break - } else { - yield nextQueuedResult() - } - } - if (count === 0) { - break - } else { - // Promise.withResolvers() is not yet implemented in node - capability = { - resolve: undefined as any, - reject: undefined as any, - promise: undefined as any - } - capability.promise = new Promise((res, rej) => { - capability!.resolve = res - capability!.reject = rej - }) - const nextResult = await capability.promise - if (nextResult === null) { - break - } else { - capability = undefined - yield nextResult() - } - } + // Promise.withResolvers() is not yet implemented in node + capability = { + resolve: undefined as any, + reject: undefined as any, + promise: undefined as any + } + capability.promise = new Promise((res, rej) => { + capability!.resolve = res + capability!.reject = rej + }) + const nextResult = await capability.promise + if (nextResult === null) { + return {done: true as const, value: undefined} + } else { + capability = undefined + return {done: false as const, value: nextResult()} } - } catch (err) { - // Unwind remaining iterators on failure - try { - await Promise.all(iterators.map((iterator) => iterator.return?.())) - } catch {} - throw err + }, + return: async () => { + await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) + return {done: true as const, value: undefined} + }, + throw: async () => { + await Promise.allSettled(iterators.map((iterator) => iterator.return?.())) + return {done: true as const, value: undefined} } - })() + } + return it } diff --git a/packages/embedder/package.json b/packages/embedder/package.json index 9bb2c2eb416..1026732fbc7 100644 --- a/packages/embedder/package.json +++ b/packages/embedder/package.json @@ -10,6 +10,7 @@ "url": "git+https://github.com/ParabolInc/parabol.git" }, "scripts": { + "precommit": "lint-staged", "lint": "eslint --fix . --ext .ts,.tsx", "lint:check": "eslint . --ext .ts,.tsx", "prettier": "prettier --config ../../.prettierrc --write \"**/*.{ts,tsx}\"", diff --git a/packages/embedder/resetStalledJobs.ts b/packages/embedder/resetStalledJobs.ts index 592b468faee..7269a3ce519 100644 --- a/packages/embedder/resetStalledJobs.ts +++ b/packages/embedder/resetStalledJobs.ts @@ -14,5 +14,5 @@ export const resetStalledJobs = () => { })) .where('startAt', '<', new Date(Date.now() - ms('5m'))) .execute() - }, ms('5m')) + }, ms('5m')).unref() } diff --git a/packages/gql-executor/gqlExecutor.ts b/packages/gql-executor/gqlExecutor.ts index a3ce88d0853..7d97a8145b2 100644 --- a/packages/gql-executor/gqlExecutor.ts +++ b/packages/gql-executor/gqlExecutor.ts @@ -30,13 +30,15 @@ const run = async () => { const executorChannel = GQLExecutorChannelId.join(SERVER_ID!) // on shutdown, remove consumer from the group - process.on('SIGTERM', async () => { + process.on('SIGTERM', async (signal) => { + console.log(`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`) await publisher.xgroup( 'DELCONSUMER', ServerChannel.GQL_EXECUTOR_STREAM, ServerChannel.GQL_EXECUTOR_CONSUMER_GROUP, executorChannel ) + console.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`) process.exit() }) diff --git a/packages/server/server.ts b/packages/server/server.ts index 60db5d0a54b..b5c9e316782 100644 --- a/packages/server/server.ts +++ b/packages/server/server.ts @@ -1,6 +1,7 @@ import tracer from 'dd-trace' import {r} from 'rethinkdb-ts' import uws, {SHARED_COMPRESSOR} from 'uWebSockets.js' +import sleep from '../client/utils/sleep' import ICSHandler from './ICSHandler' import PWAHandler from './PWAHandler' import activeClients from './activeClients' @@ -37,14 +38,19 @@ if (!__PRODUCTION__) { }) } -process.on('SIGTERM', () => { +process.on('SIGTERM', async (signal) => { + console.log( + `Server ID: ${process.env.SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.` + ) const RECONNECT_WINDOW = 60_000 // ms - Object.values(activeClients.store).forEach((connectionContext) => { - const disconnectIn = ~~(Math.random() * RECONNECT_WINDOW) - setTimeout(() => { + await Promise.allSettled( + Object.values(activeClients.store).map(async (connectionContext) => { + const disconnectIn = Math.floor(Math.random() * RECONNECT_WINDOW) + await sleep(disconnectIn) handleDisconnect(connectionContext) - }, disconnectIn) - }) + }) + ) + console.log(`Server ID: ${process.env.SERVER_ID}. Graceful shutdown complete, exiting.`) }) const PORT = Number(__PRODUCTION__ ? process.env.PORT : process.env.SOCKET_PORT)