From 655c1c92329ab7b13b79455ec68824a32d0d38fa Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Mon, 24 Dec 2018 07:31:06 +0800 Subject: [PATCH] process: move worker bootstrap code into worker_thread_only.js Move worker bootstrap code into worker_thread_only.js from internal/worker.js since they are only run once during bootstrap. PR-URL: https://github.com/nodejs/node/pull/25199 Reviewed-By: James M Snell --- lib/internal/process/worker_thread_only.js | 97 ++++++++++++++++++++-- lib/internal/worker.js | 76 +---------------- 2 files changed, 90 insertions(+), 83 deletions(-) diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index a9332fb4277363..d26159ab451c97 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -7,18 +7,21 @@ const { threadId } = internalBinding('worker'); -const debug = require('util').debuglog('worker'); - const { + messageTypes, + kStdioWantsMoreDataCallback, kWaitingStreams, ReadableWorkerStdio, WritableWorkerStdio } = require('internal/worker/io'); -const { - createMessageHandler, - createWorkerFatalExeception -} = require('internal/worker'); +let debuglog; +function debug(...args) { + if (!debuglog) { + debuglog = require('util').debuglog('worker'); + } + return debuglog(...args); +} const workerStdio = {}; @@ -36,12 +39,90 @@ function initializeWorkerStdio() { }; } +function createMessageHandler(port) { + const publicWorker = require('worker_threads'); + + return function(message) { + if (message.type === messageTypes.LOAD_SCRIPT) { + const { filename, doEval, workerData, publicPort, hasStdin } = message; + publicWorker.parentPort = publicPort; + publicWorker.workerData = workerData; + + if (!hasStdin) + workerStdio.stdin.push(null); + + debug(`[${threadId}] starts worker script ${filename} ` + + `(eval = ${eval}) at cwd = ${process.cwd()}`); + port.unref(); + port.postMessage({ type: messageTypes.UP_AND_RUNNING }); + if (doEval) { + const { evalScript } = require('internal/process/execution'); + evalScript('[worker eval]', filename); + } else { + process.argv[1] = filename; // script filename + require('module').runMain(); + } + return; + } else if (message.type === messageTypes.STDIO_PAYLOAD) { + const { stream, chunk, encoding } = message; + workerStdio[stream].push(chunk, encoding); + return; + } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) { + const { stream } = message; + workerStdio[stream][kStdioWantsMoreDataCallback](); + return; + } + + require('assert').fail(`Unknown worker message type ${message.type}`); + }; +} + +// XXX(joyeecheung): this has to be returned as an anonymous function +// wrapped in a closure, see the comment of the original +// process._fatalException in lib/internal/process/execution.js +function createWorkerFatalExeception(port) { + const { + fatalException: originalFatalException + } = require('internal/process/execution'); + + return (error) => { + debug(`[${threadId}] gets fatal exception`); + let caught = false; + try { + caught = originalFatalException.call(this, error); + } catch (e) { + error = e; + } + debug(`[${threadId}] fatal exception caught = ${caught}`); + + if (!caught) { + let serialized; + try { + const { serializeError } = require('internal/error-serdes'); + serialized = serializeError(error); + } catch {} + debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); + if (serialized) + port.postMessage({ + type: messageTypes.ERROR_MESSAGE, + error: serialized + }); + else + port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR }); + + const { clearAsyncIdStack } = require('internal/async_hooks'); + clearAsyncIdStack(); + + process.exit(); + } + }; +} + function setup() { debug(`[${threadId}] is setting up worker child environment`); const port = getEnvMessagePort(); - const publicWorker = require('worker_threads'); - port.on('message', createMessageHandler(publicWorker, port, workerStdio)); + port.on('message', createMessageHandler(port)); port.start(); return { diff --git a/lib/internal/worker.js b/lib/internal/worker.js index c4393d0459cf7e..2e1568a73822b9 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -10,7 +10,6 @@ const { ERR_WORKER_UNSUPPORTED_EXTENSION, } = require('internal/errors').codes; const { validateString } = require('internal/validators'); -const { clearAsyncIdStack } = require('internal/async_hooks'); const { drainMessagePort, @@ -24,7 +23,7 @@ const { ReadableWorkerStdio, WritableWorkerStdio, } = require('internal/worker/io'); -const { serializeError, deserializeError } = require('internal/error-serdes'); +const { deserializeError } = require('internal/error-serdes'); const { pathToFileURL } = require('url'); const { @@ -219,77 +218,6 @@ class Worker extends EventEmitter { } } -function createMessageHandler(publicWorker, port, workerStdio) { - return function(message) { - if (message.type === messageTypes.LOAD_SCRIPT) { - const { filename, doEval, workerData, publicPort, hasStdin } = message; - publicWorker.parentPort = publicPort; - publicWorker.workerData = workerData; - - if (!hasStdin) - workerStdio.stdin.push(null); - - debug(`[${threadId}] starts worker script ${filename} ` + - `(eval = ${eval}) at cwd = ${process.cwd()}`); - port.unref(); - port.postMessage({ type: messageTypes.UP_AND_RUNNING }); - if (doEval) { - const { evalScript } = require('internal/process/execution'); - evalScript('[worker eval]', filename); - } else { - process.argv[1] = filename; // script filename - require('module').runMain(); - } - return; - } else if (message.type === messageTypes.STDIO_PAYLOAD) { - const { stream, chunk, encoding } = message; - workerStdio[stream].push(chunk, encoding); - return; - } else if (message.type === messageTypes.STDIO_WANTS_MORE_DATA) { - const { stream } = message; - workerStdio[stream][kStdioWantsMoreDataCallback](); - return; - } - - assert.fail(`Unknown worker message type ${message.type}`); - }; -} - -function createWorkerFatalExeception(port) { - const { - fatalException: originalFatalException - } = require('internal/process/execution'); - - return function(error) { - debug(`[${threadId}] gets fatal exception`); - let caught = false; - try { - caught = originalFatalException.call(this, error); - } catch (e) { - error = e; - } - debug(`[${threadId}] fatal exception caught = ${caught}`); - - if (!caught) { - let serialized; - try { - serialized = serializeError(error); - } catch {} - debug(`[${threadId}] fatal exception serialized = ${!!serialized}`); - if (serialized) - port.postMessage({ - type: messageTypes.ERROR_MESSAGE, - error: serialized - }); - else - port.postMessage({ type: messageTypes.COULD_NOT_SERIALIZE_ERROR }); - clearAsyncIdStack(); - - process.exit(); - } - }; -} - function pipeWithoutWarning(source, dest) { const sourceMaxListeners = source._maxListeners; const destMaxListeners = dest._maxListeners; @@ -303,8 +231,6 @@ function pipeWithoutWarning(source, dest) { } module.exports = { - createMessageHandler, - createWorkerFatalExeception, threadId, Worker, isMainThread