diff --git a/Makefile b/Makefile index fb978380e5..850db10783 100644 --- a/Makefile +++ b/Makefile @@ -465,6 +465,8 @@ test-with-async-hooks: $(CI_JS_SUITES) \ $(CI_NATIVE_SUITES) +test-worker: + $(PYTHON) tools/test.py --mode=release workers ifneq ("","$(wildcard deps/v8/tools/run-tests.py)") test-v8: v8 diff --git a/benchmark/fixtures/echo.worker.js b/benchmark/fixtures/echo.worker.js new file mode 100644 index 0000000000..61ee804815 --- /dev/null +++ b/benchmark/fixtures/echo.worker.js @@ -0,0 +1,7 @@ +'use strict'; + +const worker = require('worker'); + +worker.on('workerMessage', (msg) => { + worker.postMessage(msg); +}); diff --git a/benchmark/worker/echo.js b/benchmark/worker/echo.js new file mode 100644 index 0000000000..e83b525b67 --- /dev/null +++ b/benchmark/worker/echo.js @@ -0,0 +1,72 @@ +'use strict'; + +const { Worker } = require('worker'); +const common = require('../common.js'); +const path = require('path'); +const bench = common.createBenchmark(main, { + workers: [1], + payload: ['string', 'object'], + sendsPerBroadcast: [1, 10], + n: [1e5] +}); + +const workerPath = path.resolve(__dirname, '..', 'fixtures', 'echo.worker.js'); + +function main(conf) { + const n = +conf.n; + const workers = +conf.workers; + const sends = +conf.sendsPerBroadcast; + const expectedPerBroadcast = sends * workers; + var payload; + var readies = 0; + var broadcasts = 0; + var msgCount = 0; + + switch (conf.payload) { + case 'string': + payload = 'hello world!'; + break; + case 'object': + payload = { action: 'pewpewpew', powerLevel: 9001 }; + break; + default: + throw new Error('Unsupported payload type'); + } + + const workerObjs = []; + + for (var i = 0; i < workers; ++i) { + const worker = new Worker(workerPath); + workerObjs.push(worker); + worker.on('online', onOnline); + worker.on('message', onMessage); + } + + function onOnline() { + if (++readies === workers) { + bench.start(); + broadcast(); + } + } + + function broadcast() { + if (broadcasts++ === n) { + bench.end(n); + for (const worker of workerObjs) { + worker.unref(); + } + return; + } + for (const worker of workerObjs) { + for (var i = 0; i < sends; ++i) + worker.postMessage(payload); + } + } + + function onMessage() { + if (++msgCount === expectedPerBroadcast) { + msgCount = 0; + broadcast(); + } + } +} diff --git a/common.gypi b/common.gypi index 98268068f9..0b1db8e2e6 100644 --- a/common.gypi +++ b/common.gypi @@ -28,6 +28,11 @@ # Enable disassembler for `--print-code` v8 options 'v8_enable_disassembler': 1, + 'v8_extra_library_files': [ + './lib/extras/events.js', + './lib/extras/messaging.js' + ], + # Don't bake anything extra into the snapshot. 'v8_use_external_startup_data%': 0, diff --git a/doc/api/_toc.md b/doc/api/_toc.md index b3987ed8e4..36f528132b 100644 --- a/doc/api/_toc.md +++ b/doc/api/_toc.md @@ -50,6 +50,7 @@ * [Utilities](util.html) * [V8](v8.html) * [VM](vm.html) +* [Worker](worker.html) * [ZLIB](zlib.html)
diff --git a/doc/api/domain.md b/doc/api/domain.md index 30e93e2bea..940831d0f0 100644 --- a/doc/api/domain.md +++ b/doc/api/domain.md @@ -31,6 +31,8 @@ will be notified, rather than losing the context of the error in the `process.on('uncaughtException')` handler, or causing the program to exit immediately with an error code. +*Note*: This module is not available in [`Worker`][]s. + ## Warning: Don't Ignore Errors! @@ -495,3 +497,4 @@ rejections. [`setInterval()`]: timers.html#timers_setinterval_callback_delay_args [`setTimeout()`]: timers.html#timers_settimeout_callback_delay_args [`throw`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/throw +[`Worker`]: #worker_class_worker diff --git a/doc/api/process.md b/doc/api/process.md index 8b8ce0f43d..c17bd42d17 100644 --- a/doc/api/process.md +++ b/doc/api/process.md @@ -415,6 +415,8 @@ added: v0.7.0 The `process.abort()` method causes the Node.js process to exit immediately and generate a core file. +*Note*: This feature is not available in [`Worker`][] threads. + ## process.arch + +* `port` {MessagePort} +* `contextifiedSandbox` {Object} A contextified object as returned by the + `vm.createContext()` method. +* Returns: {MessagePort} + +Bind a `MessagePort` to a specific VM context. This returns a new `MessagePort` +object, whose prototype and methods act as if they were created in the passed +context. The received messages will also be emitted as objects from the passed +context. + +The `port` object on which this method was called can not be used for sending +or receiving further messages. + ## vm.runInDebugContext(code) + +> Stability: 1 - Experimental + +The `worker` module provides a way to create multiple environments running +on independent threads, and to create message channels between those. It +can be accessed using: + +```js +const worker = require('worker'); +``` + +Workers are useful for performing CPU-intensive JavaScript operations; do not +use them for I/O, since Ayo’s built-in mechanisms for performing operations +asynchronously already treat it more efficiently than Worker threads can. + +Workers can also, unlike child processes or when using the `cluster` module, +share memory efficiently by transferring `ArrayBuffer` instances or sharing +`SharedArrayBuffer` instances between them. + +## Example + +```js +const { Worker, isMainThread, postMessage, workerData } = require('worker'); + +if (isMainThread) { + module.exports = async function parseJSAsync(script) { + return new Promise((resolve, reject) => { + const worker = new Worker(__filename, { + workerData: script + }); + worker.on('message', resolve); + worker.on('error', reject); + worker.on('exit', (code) => { + if (code !== 0) + reject(new Error(`Worker stopped with exit code ${code}`)); + }); + }); + }; +} else { + const { parse } = require('some-js-parsing-library'); + const script = workerData; + postMessage(parse(script)); +} +``` + +## Event: 'workerMessage' + + +If this thread was spawned by a `Worker`, this event emits messages sent using +[`worker.postMessage()`][]. See the [`port.on('message')`][] event for +more details. + +Listeners on this event will receive a clone of the `value` parameter as passed +to `postMessage()` and no further arguments. + +*Note*: This event is emitted on the `worker` module as returned by +`require('worker')` itself. + +## worker.isMainThread + + +* {boolean} + +Is `true` if this code is not running inside of a [`Worker`][] thread. + +## worker.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +* Returns: {undefined} + +This method is available if this is a `Worker` thread, which can be tested +using [`require('worker').isMainThread`][]. + +Send a message to the parent thread’s `Worker` instance that will be received +via [`worker.on('message')`][]. See [`port.postMessage()`][] for +more details. + +### worker.ref() + + +Opposite of `unref`, calling `ref` on a previously `unref`d worker will *not* +let the program exit if it's the only active handle left (the default behavior). +If the worker is `ref`d calling `ref` again will have no effect. + +### worker.unref() + + +Calling `unref` on a worker will allow the thread to exit if this is the only +active handle in the event system. If the worker is already `unref`d calling +`unref` again will have no effect. + +## worker.threadId + + +* {integer} + +An integer identifier for the current thread. On the corresponding worker object +(if there is any), it is available as [`worker.threadId`][]. + +## worker.workerData + + +An arbitrary JavaScript value that contains a clone of the data passed +to this thread’s `Worker` constructor. + + +## Class: MessageChannel + + + +Instances of the `worker.MessageChannel` class represent an asynchronous, +two-way communications channel. +The `MessageChannel` has no methods of its own. `new MessageChannel()` +yields an object with `port1` and `port2` properties, which refer to linked +[`MessagePort`][] instances. + +```js +const { MessageChannel } = require('worker'); + +const { port1, port2 } = new MessageChannel(); +port1.on('message', (message) => console.log('received', message)); +port2.postMessage({ foo: 'bar' }); +// prints: received { foo: 'bar' } +``` + +## Class: MessagePort + + +* Extends: {EventEmitter} + +Instances of the `worker.MessagePort` class represent one end of an +asynchronous, two-way communications channel. It can be used to transfer +structured data, memory regions and other `MessagePort`s between different +[`Worker`][]s or [`vm` context][vm]s. + +For transferring `MessagePort` instances between VM contexts, see +[`vm.moveMessagePortToContext()`][]. + +*Note*: With the exception of `MessagePort`s being [`EventEmitter`][]s rather +than `EventTarget`s, this implementation matches [browser `MessagePort`][]s. + +### Event: 'close' + + +The `'close'` event is emitted once either side of the channel has been +disconnected. + +### Event: 'error' + + +The `'error'` event is emitted if the worker thread throws an uncaught +expection. In that case, the worker will be terminated. + +### Event: 'message' + + +* `value` {any} The transmitted value + +The `'message'` event is emitted for any incoming message, containing the cloned +input of [`port.postMessage()`][]. + +Listeners on this event will receive a clone of the `value` parameter as passed +to `postMessage()` and no further arguments. + +### port.close() + + +* Returns: {undefined} + +Disables further sending of messages on either side of the connection. +You should call this method once you know that no further communication +will happen over this `MessagePort`. + +### port.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +* Returns: {undefined} + +Sends a JavaScript value to the receiving side of this channel. +`value` will be transferred in a way +that is compatible with the [HTML structured clone algorithm][]. In particular, +it may contain circular references and objects like typed arrays that `JSON` +is not able to serialize. + +`transferList` may be a list of `ArrayBuffer` and `MessagePort` objects. +After transferring, they will not be usable on the sending side of the channel +anymore (even if they are not contained in `value`). + +If `value` contains [`SharedArrayBuffer`][] instances, those will be accessible +from either thread. + +`value` may still contain `ArrayBuffer` instances that are not in +`transferList`; in that case, the underlying memory is copied rather than moved. + +For more information on the serialization and deserialization mechanisms +behind this API, see the [serialization API of the `v8` module][v8.serdes]. + +*Note*: Because the object cloning uses the structured clone algorithm, +non-enumberable properties, accessors, and prototypes are not preserved. +In particular, [`Buffer`][] objects will be read as plain [`Uint8Array`][]s +on the receiving side. + +### port.ref() + + +Opposite of `unref`, calling `ref` on a previously `unref`d port will *not* +let the program exit if it's the only active handle left (the default behavior). +If the port is `ref`d calling `ref` again will have no effect. + +### port.unref() + + +Calling `unref` on a port will allow the thread to exit if this is the only +active handle in the event system. If the port is already `unref`d calling +`unref` again will have no effect. + +### port.start() + + +* Returns: {undefined} + +Starts receiving messages on this `MessagePort`. When using this port +as an event emitter, this will be called automatically once `'message'` +listeners are attached. This means that this method does not need to be used +unless you are using [`vm.moveMessagePortToContext()`][] to move this `port` +into another VM context. + +## Class: Worker + + +The `Worker` class represents an independent JavaScript execution thread. +Most Ayo APIs are available inside of it. + +Notable differences inside a Worker environment are: + +- The [`process.stdin`][], [`process.stdout`][] and [`process.stderr`][] + properties are set to `null`. +- The [`domain`][] module is not usable inside of workers. +- The [`require('worker').isMainThread`][] property is set to `false`. +- The [`require('worker').postMessage()`][] method is available and the + [`require('worker').on('workerMessage')`][] event is being emitted. +- [`process.exit()`][] does not stop the whole program, just the single thread, + and [`process.abort()`][] is not available. +- [`process.chdir()`][] as well as `process` methods that set group or user ids + are not available. +- [`process.env`][] is a read-only reference to the environment variables. +- [`process.title`][] can not be modified. +- Native addons that were not build with explicit `Worker` support can not be + loaded. +- Execution may stop at any point as a result of the [`worker.terminate()`][] + method being invoked. +- IPC channels from parent processes are not accessible. + +Creating `Worker` instances inside of other `Worker`s is permitted. + +Like [Web Workers][] and the [`cluster` module][], two-way communication can be +achieved through inter-thread message passing. Internally, a `Worker` has a +built-in pair of [`MessagePort`][]s that are already associated with each other +when the `Worker` is created. While the `MessagePort` objects are not directly +exposed, their functionalities are exposed through [`worker.postMessage()`][] +and the [`worker.on('message')`][] event on the `Worker` object for the parent +thread, and [`require('worker').postMessage()`][] and the +[`require('worker').on('workerMessage')`][] on `require('worker')` for the +child thread. + +To create custom messaging channels (which is encouraged over using the default +global channel because it facilitates seperation of concerns), users can create +a `MessageChannel` object on either thread and pass one of the +`MessagePort`s on that `MessageChannel` to the other thread through a +pre-existing channel, such as the global one. + +See [`port.postMessage()`][] for more information on how messages are passed, +and what kind of JavaScript values can be successfully transported through +the thread barrier. + +For example: + +```js +const assert = require('assert'); +const { Worker, MessageChannel, MessagePort, isMainThread } = require('worker'); +if (isMainThread) { + const worker = new Worker(__filename); + const subChannel = new MessageChannel(); + worker.postMessage({ hereIsYourPort: subChannel.port1 }, [subChannel.port1]); + subChannel.port2.on('message', (value) => { + console.log('received:', value); + }); +} else { + require('worker').once('workerMessage', (value) => { + assert(value.hereIsYourPort instanceof MessagePort); + value.hereIsYourPort.postMessage('the worker is sending this'); + value.hereIsYourPort.close(); + }); +} +``` + +### new Worker(filename, options) + +* filename {string} The absolute path to the Worker’s main script. + If `options.eval` is true, this is a string containing JavaScript code rather + than a path. +* options {Object} + * eval {boolean} If true, interpret the first argument to the constructor + as a script that is executed once the worker is online. + * data {any} Any JavaScript value that will be cloned and made + available as [`require('worker').workerData`][]. The cloning will occur as + described in the [HTML structured clone algorithm][], and an error will be + thrown if the object can not be cloned (e.g. because it contains + `function`s). + * maxSemiSpaceSize {integer} An optional memory limit in MB for the thread’s + heap’s semi-space, which contains most short-lived objects. + * maxOldSpaceSize {integer} An optional memory limit in MB for the thread’s + main heap. + +### Event: 'exit' + + +* `exitCode` {integer} + +The `'exit'` event is emitted once the worker has stopped. If the worker +exited by calling [`process.exit()`][], the `exitCode` parameter will be the +passed exit code. If the worker was terminated, the `exitCode` parameter will +be `1`. + +### Event: 'message' + + +* `value` {any} The transmitted value + +The `'message'` event is emitted when the worker thread has invoked +[`require('worker').postMessage()`][]. See the [`port.on('message')`][] event +for more details. + +### Event: 'online' + + +The `'online'` event is emitted when the worker thread has started executing +JavaScript code. + +### worker.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +Send a message to the worker that will be received via +[`require('worker').on('workerMessage')`][]. See [`port.postMessage()`][] for +more details. + +### worker.terminate([callback]) + + +* `callback` {Function} + +Stop all JavaScript execution in the worker thread as soon as possible. +`callback` is an optional function that is invoked once this operation is known +to have completed. + +*Note*: Currently, not all code in the internals of Ayo.js is prepared to expect +termination at arbitrary points in time and may crash if it encounters that +condition. Consequently, you should currently only call `.terminate()` if +it is known that the Worker thread is not accessing Ayo.js core modules other +than what is exposed in the `worker` module. + +### worker.threadId + + +* {integer} + +An integer identifier for the referenced thread. Inside the worker thread, +it is available as [`require('worker').threadId`][]. + +[`Buffer`]: buffer.html +[`EventEmitter`]: events.html +[`MessagePort`]: #worker_class_messageport +[`port.postMessage()`]: #worker_port_postmessage_value_transferlist +[`Worker`]: #worker_class_worker +[`worker.terminate()`]: #worker_worker_terminate_callback +[`worker.postMessage()`]: #worker_worker_postmessage_value_transferlist_1 +[`worker.on('message')`]: #worker_event_message_1 +[`worker.threadId`]: #worker_worker_threadid_1 +[`port.postMessage()`]: #worker_port_postmessage_value_transferlist +[`port.on('message')`]: #worker_event_message +[`process.exit()`]: process.html#process_process_exit +[`process.exit()`]: process.html#process_process_exit +[`process.abort()`]: process.html#process_process_abort +[`process.chdir()`]: process.html#process_process_chdir_directory +[`process.env`]: process.html#process_process_env +[`process.stdin`]: process.html#process_process_stdin +[`process.stderr`]: process.html#process_process_stderr +[`process.stdout`]: process.html#process_process_stdout +[`process.title`]: process.html#process_process_title +[`require('worker').workerData`]: #worker_worker_workerdata +[`require('worker').on('workerMessage')`]: #worker_event_workermessage +[`require('worker').postMessage()`]: #worker_worker_postmessage_value_transferlist +[`require('worker').isMainThread`]: #worker_worker_is_main_thread +[`require('worker').threadId`]: #worker_worker_threadid +[`domain`]: domain.html +[`vm.moveMessagePortToContext()`]: vm.html#vm_vm_movemessageporttocontext_port_context +[`cluster` module]: cluster.html +[vm]: vm.html#vm_vm_executing_javascript +[v8.serdes]: v8.html#v8_serialization_api +[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer +[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array +[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort +[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm +[Web Workers]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API + diff --git a/lib/console.js b/lib/console.js index 54c8aba829..ea19fbd54b 100644 --- a/lib/console.js +++ b/lib/console.js @@ -245,7 +245,12 @@ Console.prototype.groupEnd = function groupEnd() { this[kGroupIndent].slice(0, this[kGroupIndent].length - 2); }; -module.exports = new Console(process.stdout, process.stderr); +if (isMainThread) { + module.exports = new Console(process.stdout, process.stderr); +} else { + const { SyncWriteStream } = require('internal/fs'); + module.exports = new Console(new SyncWriteStream(1), new SyncWriteStream(2)); +} module.exports.Console = Console; function noop() {} diff --git a/lib/events.js b/lib/events.js index 1414a1429d..06e0209a5c 100644 --- a/lib/events.js +++ b/lib/events.js @@ -21,51 +21,9 @@ 'use strict'; -var domain; - -function EventEmitter() { - EventEmitter.init.call(this); -} -module.exports = EventEmitter; - -// Backwards-compat with node 0.10.x -EventEmitter.EventEmitter = EventEmitter; - -EventEmitter.usingDomains = false; - -EventEmitter.prototype.domain = undefined; -EventEmitter.prototype._events = undefined; -EventEmitter.prototype._maxListeners = undefined; - -// By default EventEmitters will print a warning if more than 10 listeners are -// added to it. This is a useful default which helps finding memory leaks. -var defaultMaxListeners = 10; +var { EventEmitter } = process.binding('extras').binding; -var errors; -function lazyErrors() { - if (errors === undefined) - errors = require('internal/errors'); - return errors; -} - -Object.defineProperty(EventEmitter, 'defaultMaxListeners', { - enumerable: true, - get: function() { - return defaultMaxListeners; - }, - set: function(arg) { - // force global console to be compiled. - // see https://github.com/nodejs/node/issues/4467 - console; - // check whether the input is a positive number (whose value is zero or - // greater and not a NaN). - if (typeof arg !== 'number' || arg < 0 || arg !== arg) { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_OUT_OF_RANGE', 'defaultMaxListeners'); - } - defaultMaxListeners = arg; - } -}); +var domain; EventEmitter.init = function() { this.domain = null; @@ -85,468 +43,4 @@ EventEmitter.init = function() { this._maxListeners = this._maxListeners || undefined; }; -// Obviously not all Emitters should be limited to 10. This function allows -// that to be increased. Set to zero for unlimited. -EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { - if (typeof n !== 'number' || n < 0 || isNaN(n)) { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_OUT_OF_RANGE', 'n'); - } - this._maxListeners = n; - return this; -}; - -function $getMaxListeners(that) { - if (that._maxListeners === undefined) - return EventEmitter.defaultMaxListeners; - return that._maxListeners; -} - -EventEmitter.prototype.getMaxListeners = function getMaxListeners() { - return $getMaxListeners(this); -}; - -// These standalone emit* functions are used to optimize calling of event -// handlers for fast cases because emit() itself often has a variable number of -// arguments and can be deoptimized because of that. These functions always have -// the same number of arguments and thus do not get deoptimized, so the code -// inside them can execute faster. -function emitNone(handler, isFn, self) { - if (isFn) - handler.call(self); - else { - var len = handler.length; - var listeners = arrayClone(handler, len); - for (var i = 0; i < len; ++i) - listeners[i].call(self); - } -} -function emitOne(handler, isFn, self, arg1) { - if (isFn) - handler.call(self, arg1); - else { - var len = handler.length; - var listeners = arrayClone(handler, len); - for (var i = 0; i < len; ++i) - listeners[i].call(self, arg1); - } -} -function emitTwo(handler, isFn, self, arg1, arg2) { - if (isFn) - handler.call(self, arg1, arg2); - else { - var len = handler.length; - var listeners = arrayClone(handler, len); - for (var i = 0; i < len; ++i) - listeners[i].call(self, arg1, arg2); - } -} -function emitThree(handler, isFn, self, arg1, arg2, arg3) { - if (isFn) - handler.call(self, arg1, arg2, arg3); - else { - var len = handler.length; - var listeners = arrayClone(handler, len); - for (var i = 0; i < len; ++i) - listeners[i].call(self, arg1, arg2, arg3); - } -} - -function emitMany(handler, isFn, self, args) { - if (isFn) - handler.apply(self, args); - else { - var len = handler.length; - var listeners = arrayClone(handler, len); - for (var i = 0; i < len; ++i) - listeners[i].apply(self, args); - } -} - -EventEmitter.prototype.emit = function emit(type) { - var er, handler, len, args, i, events, domain; - var needDomainExit = false; - var doError = (type === 'error'); - - events = this._events; - if (events) - doError = (doError && events.error == null); - else if (!doError) - return false; - - domain = this.domain; - - // If there is no 'error' event listener then throw. - if (doError) { - if (arguments.length > 1) - er = arguments[1]; - if (domain) { - if (!er) { - const errors = lazyErrors(); - er = new errors.Error('ERR_UNHANDLED_ERROR'); - } - if (typeof er === 'object' && er !== null) { - er.domainEmitter = this; - er.domain = domain; - er.domainThrown = false; - } - domain.emit('error', er); - } else if (er instanceof Error) { - throw er; // Unhandled 'error' event - } else { - // At least give some kind of context to the user - const errors = lazyErrors(); - const err = new errors.Error('ERR_UNHANDLED_ERROR', er); - err.context = er; - throw err; - } - return false; - } - - handler = events[type]; - - if (!handler) - return false; - - if (domain && this !== process) { - domain.enter(); - needDomainExit = true; - } - - var isFn = typeof handler === 'function'; - len = arguments.length; - switch (len) { - // fast cases - case 1: - emitNone(handler, isFn, this); - break; - case 2: - emitOne(handler, isFn, this, arguments[1]); - break; - case 3: - emitTwo(handler, isFn, this, arguments[1], arguments[2]); - break; - case 4: - emitThree(handler, isFn, this, arguments[1], arguments[2], arguments[3]); - break; - // slower - default: - args = new Array(len - 1); - for (i = 1; i < len; i++) - args[i - 1] = arguments[i]; - emitMany(handler, isFn, this, args); - } - - if (needDomainExit) - domain.exit(); - - return true; -}; - -function _addListener(target, type, listener, prepend) { - var m; - var events; - var existing; - - if (typeof listener !== 'function') { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'function'); - } - - events = target._events; - if (!events) { - events = target._events = Object.create(null); - target._eventsCount = 0; - } else { - // To avoid recursion in the case that type === "newListener"! Before - // adding it to the listeners, first emit "newListener". - if (events.newListener) { - target.emit('newListener', type, - listener.listener ? listener.listener : listener); - - // Re-assign `events` because a newListener handler could have caused the - // this._events to be assigned to a new object - events = target._events; - } - existing = events[type]; - } - - if (!existing) { - // Optimize the case of one listener. Don't need the extra array object. - existing = events[type] = listener; - ++target._eventsCount; - } else { - if (typeof existing === 'function') { - // Adding the second element, need to change to array. - existing = events[type] = - prepend ? [listener, existing] : [existing, listener]; - } else { - // If we've already got an array, just append. - if (prepend) { - existing.unshift(listener); - } else { - existing.push(listener); - } - } - - // Check for listener leak - if (!existing.warned) { - m = $getMaxListeners(target); - if (m && m > 0 && existing.length > m) { - existing.warned = true; - // No error code for this since it is a Warning - const w = new Error('Possible EventEmitter memory leak detected. ' + - `${existing.length} ${String(type)} listeners ` + - 'added. Use emitter.setMaxListeners() to ' + - 'increase limit'); - w.name = 'MaxListenersExceededWarning'; - w.emitter = target; - w.type = type; - w.count = existing.length; - process.emitWarning(w); - } - } - } - - return target; -} - -EventEmitter.prototype.addListener = function addListener(type, listener) { - return _addListener(this, type, listener, false); -}; - -EventEmitter.prototype.on = EventEmitter.prototype.addListener; - -EventEmitter.prototype.prependListener = - function prependListener(type, listener) { - return _addListener(this, type, listener, true); - }; - -function onceWrapper() { - if (!this.fired) { - this.target.removeListener(this.type, this.wrapFn); - this.fired = true; - switch (arguments.length) { - case 0: - return this.listener.call(this.target); - case 1: - return this.listener.call(this.target, arguments[0]); - case 2: - return this.listener.call(this.target, arguments[0], arguments[1]); - case 3: - return this.listener.call(this.target, arguments[0], arguments[1], - arguments[2]); - default: - const args = new Array(arguments.length); - for (var i = 0; i < args.length; ++i) - args[i] = arguments[i]; - this.listener.apply(this.target, args); - } - } -} - -function _onceWrap(target, type, listener) { - var state = { fired: false, wrapFn: undefined, target, type, listener }; - var wrapped = onceWrapper.bind(state); - wrapped.listener = listener; - state.wrapFn = wrapped; - return wrapped; -} - -EventEmitter.prototype.once = function once(type, listener) { - if (typeof listener !== 'function') { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', 'function'); - } - this.on(type, _onceWrap(this, type, listener)); - return this; -}; - -EventEmitter.prototype.prependOnceListener = - function prependOnceListener(type, listener) { - if (typeof listener !== 'function') { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', - 'function'); - } - this.prependListener(type, _onceWrap(this, type, listener)); - return this; - }; - -// Emits a 'removeListener' event if and only if the listener was removed. -EventEmitter.prototype.removeListener = - function removeListener(type, listener) { - var list, events, position, i, originalListener; - - if (typeof listener !== 'function') { - const errors = lazyErrors(); - throw new errors.TypeError('ERR_INVALID_ARG_TYPE', 'listener', - 'function'); - } - - events = this._events; - if (!events) - return this; - - list = events[type]; - if (!list) - return this; - - if (list === listener || list.listener === listener) { - if (--this._eventsCount === 0) - this._events = Object.create(null); - else { - delete events[type]; - if (events.removeListener) - this.emit('removeListener', type, list.listener || listener); - } - } else if (typeof list !== 'function') { - position = -1; - - for (i = list.length - 1; i >= 0; i--) { - if (list[i] === listener || list[i].listener === listener) { - originalListener = list[i].listener; - position = i; - break; - } - } - - if (position < 0) - return this; - - if (position === 0) - list.shift(); - else - spliceOne(list, position); - - if (list.length === 1) - events[type] = list[0]; - - if (events.removeListener) - this.emit('removeListener', type, originalListener || listener); - } - - return this; - }; - -EventEmitter.prototype.removeAllListeners = - function removeAllListeners(type) { - var listeners, events, i; - - events = this._events; - if (!events) - return this; - - // not listening for removeListener, no need to emit - if (!events.removeListener) { - if (arguments.length === 0) { - this._events = Object.create(null); - this._eventsCount = 0; - } else if (events[type]) { - if (--this._eventsCount === 0) - this._events = Object.create(null); - else - delete events[type]; - } - return this; - } - - // emit removeListener for all listeners on all events - if (arguments.length === 0) { - var keys = Object.keys(events); - var key; - for (i = 0; i < keys.length; ++i) { - key = keys[i]; - if (key === 'removeListener') continue; - this.removeAllListeners(key); - } - this.removeAllListeners('removeListener'); - this._events = Object.create(null); - this._eventsCount = 0; - return this; - } - - listeners = events[type]; - - if (typeof listeners === 'function') { - this.removeListener(type, listeners); - } else if (listeners) { - // LIFO order - for (i = listeners.length - 1; i >= 0; i--) { - this.removeListener(type, listeners[i]); - } - } - - return this; - }; - -EventEmitter.prototype.listeners = function listeners(type) { - var evlistener; - var ret; - var events = this._events; - - if (!events) - ret = []; - else { - evlistener = events[type]; - if (!evlistener) - ret = []; - else if (typeof evlistener === 'function') - ret = [evlistener.listener || evlistener]; - else - ret = unwrapListeners(evlistener); - } - - return ret; -}; - -EventEmitter.listenerCount = function(emitter, type) { - if (typeof emitter.listenerCount === 'function') { - return emitter.listenerCount(type); - } else { - return listenerCount.call(emitter, type); - } -}; - -EventEmitter.prototype.listenerCount = listenerCount; -function listenerCount(type) { - const events = this._events; - - if (events) { - const evlistener = events[type]; - - if (typeof evlistener === 'function') { - return 1; - } else if (evlistener) { - return evlistener.length; - } - } - - return 0; -} - -EventEmitter.prototype.eventNames = function eventNames() { - return this._eventsCount > 0 ? Reflect.ownKeys(this._events) : []; -}; - -// About 1.5x faster than the two-arg version of Array#splice(). -function spliceOne(list, index) { - for (var i = index, k = i + 1, n = list.length; k < n; i += 1, k += 1) - list[i] = list[k]; - list.pop(); -} - -function arrayClone(arr, n) { - var copy = new Array(n); - for (var i = 0; i < n; ++i) - copy[i] = arr[i]; - return copy; -} - -function unwrapListeners(arr) { - const ret = new Array(arr.length); - for (var i = 0; i < ret.length; ++i) { - ret[i] = arr[i].listener || arr[i]; - } - return ret; -} +module.exports = EventEmitter; diff --git a/lib/extras/events.js b/lib/extras/events.js new file mode 100644 index 0000000000..d06d89cce5 --- /dev/null +++ b/lib/extras/events.js @@ -0,0 +1,554 @@ +// Copyright Joyent, Inc. and other Node contributors. +// +// Permission is hereby granted, free of charge, to any person obtaining a +// copy of this software and associated documentation files (the +// "Software"), to deal in the Software without restriction, including +// without limitation the rights to use, copy, modify, merge, publish, +// distribute, sublicense, and/or sell copies of the Software, and to permit +// persons to whom the Software is furnished to do so, subject to the +// following conditions: +// +// The above copyright notice and this permission notice shall be included +// in all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN +// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE +// USE OR OTHER DEALINGS IN THE SOFTWARE. + +/* eslint-disable indent, strict */ +(function(global, binding, v8) { + +'use strict'; + +function EventEmitter() { + EventEmitter.init.call(this); +} +binding.EventEmitter = EventEmitter; + +// Backwards-compat with node 0.10.x +EventEmitter.EventEmitter = EventEmitter; + +// Overridden when domain module is loaded +EventEmitter.usingDomains = false; + +EventEmitter.prototype.domain = undefined; +EventEmitter.prototype._events = undefined; +EventEmitter.prototype._maxListeners = undefined; + +// By default EventEmitters will print a warning if more than 10 listeners are +// added to it. This is a useful default which helps finding memory leaks. +var defaultMaxListeners = 10; + +Object.defineProperty(EventEmitter, 'defaultMaxListeners', { + enumerable: true, + get: function() { + return defaultMaxListeners; + }, + set: function(arg) { + // force global console to be compiled. + // see https://github.com/nodejs/node/issues/4467 + console; + // check whether the input is a positive number (whose value is zero or + // greater and not a NaN). + if (typeof arg !== 'number' || arg < 0 || arg !== arg) { + const err = + new TypeError('The "defaultMaxListeners" argument is out of range'); + err.code = 'ERR_OUT_OF_RANGE'; + throw err; + } + defaultMaxListeners = arg; + } +}); + +// This version of the function is not domain-aware, suitable for contexts that +// do not have access to Node.js domains. It will get monkey patched in +// lib/events.js. +EventEmitter.init = function() { + this.domain = null; + + if (!this._events || this._events === Object.getPrototypeOf(this)._events) { + this._events = Object.create(null); + this._eventsCount = 0; + } + + this._maxListeners = this._maxListeners || undefined; +}; + +// Obviously not all Emitters should be limited to 10. This function allows +// that to be increased. Set to zero for unlimited. +EventEmitter.prototype.setMaxListeners = function setMaxListeners(n) { + if (typeof n !== 'number' || n < 0 || isNaN(n)) { + const err = new TypeError('The "n" argument is out of range'); + err.code = 'ERR_OUT_OF_RANGE'; + throw err; + } + this._maxListeners = n; + return this; +}; + +function $getMaxListeners(that) { + if (that._maxListeners === undefined) + return EventEmitter.defaultMaxListeners; + return that._maxListeners; +} + +EventEmitter.prototype.getMaxListeners = function getMaxListeners() { + return $getMaxListeners(this); +}; + +// These standalone emit* functions are used to optimize calling of event +// handlers for fast cases because emit() itself often has a variable number of +// arguments and can be deoptimized because of that. These functions always have +// the same number of arguments and thus do not get deoptimized, so the code +// inside them can execute faster. +function emitNone(handler, isFn, self) { + if (isFn) + handler.call(self); + else { + var len = handler.length; + var listeners = arrayClone(handler, len); + for (var i = 0; i < len; ++i) + listeners[i].call(self); + } +} +function emitOne(handler, isFn, self, arg1) { + if (isFn) + handler.call(self, arg1); + else { + var len = handler.length; + var listeners = arrayClone(handler, len); + for (var i = 0; i < len; ++i) + listeners[i].call(self, arg1); + } +} +function emitTwo(handler, isFn, self, arg1, arg2) { + if (isFn) + handler.call(self, arg1, arg2); + else { + var len = handler.length; + var listeners = arrayClone(handler, len); + for (var i = 0; i < len; ++i) + listeners[i].call(self, arg1, arg2); + } +} +function emitThree(handler, isFn, self, arg1, arg2, arg3) { + if (isFn) + handler.call(self, arg1, arg2, arg3); + else { + var len = handler.length; + var listeners = arrayClone(handler, len); + for (var i = 0; i < len; ++i) + listeners[i].call(self, arg1, arg2, arg3); + } +} + +function emitMany(handler, isFn, self, args) { + if (isFn) + handler.apply(self, args); + else { + var len = handler.length; + var listeners = arrayClone(handler, len); + for (var i = 0; i < len; ++i) + listeners[i].apply(self, args); + } +} + +EventEmitter.prototype.emit = function emit(type) { + var er, handler, len, args, i, events, domain; + var needDomainExit = false; + var doError = (type === 'error'); + + events = this._events; + if (events) + doError = (doError && events.error == null); + else if (!doError) + return false; + + domain = this.domain; + + // If there is no 'error' event listener then throw. + if (doError) { + if (arguments.length > 1) + er = arguments[1]; + if (domain) { + if (!er) { + er = new Error('Unhandled "error" event'); + er.code = 'ERR_UNHANDLED_ERROR'; + } + if (typeof er === 'object' && er !== null) { + er.domainEmitter = this; + er.domain = domain; + er.domainThrown = false; + } + domain.emit('error', er); + } else if (er instanceof Error) { + throw er; // Unhandled 'error' event + } else { + // At least give some kind of context to the user + const err = new Error('Unhandled error. (' + er + ')'); + err.context = er; + err.code = 'ERR_UNHANDLED_ERROR'; + throw err; + } + return false; + } + + handler = events[type]; + + if (!handler) + return false; + + if (typeof process !== 'undefined' && domain && this !== process) { + domain.enter(); + needDomainExit = true; + } + + var isFn = typeof handler === 'function'; + len = arguments.length; + switch (len) { + // fast cases + case 1: + emitNone(handler, isFn, this); + break; + case 2: + emitOne(handler, isFn, this, arguments[1]); + break; + case 3: + emitTwo(handler, isFn, this, arguments[1], arguments[2]); + break; + case 4: + emitThree(handler, isFn, this, arguments[1], arguments[2], arguments[3]); + break; + // slower + default: + args = new Array(len - 1); + for (i = 1; i < len; i++) + args[i - 1] = arguments[i]; + emitMany(handler, isFn, this, args); + } + + if (needDomainExit) + domain.exit(); + + return true; +}; + +function _addListener(target, type, listener, prepend) { + var m; + var events; + var existing; + + if (typeof listener !== 'function') { + const err = + new TypeError('The "listener" argument must be of type function'); + err.code = 'ERR_INVALID_ARG_TYPE'; + throw err; + } + + events = target._events; + if (!events) { + events = target._events = Object.create(null); + target._eventsCount = 0; + } else { + // To avoid recursion in the case that type === "newListener"! Before + // adding it to the listeners, first emit "newListener". + if (events.newListener) { + target.emit('newListener', type, + listener.listener ? listener.listener : listener); + + // Re-assign `events` because a newListener handler could have caused the + // this._events to be assigned to a new object + events = target._events; + } + existing = events[type]; + } + + if (!existing) { + // Optimize the case of one listener. Don't need the extra array object. + existing = events[type] = listener; + ++target._eventsCount; + } else { + if (typeof existing === 'function') { + // Adding the second element, need to change to array. + existing = events[type] = + prepend ? [listener, existing] : [existing, listener]; + } else { + // If we've already got an array, just append. + if (prepend) { + existing.unshift(listener); + } else { + existing.push(listener); + } + } + + // Check for listener leak + // Ignore if the current context does not have a process object + if (typeof process !== 'undefined' && !existing.warned) { + m = $getMaxListeners(target); + if (m && m > 0 && existing.length > m) { + existing.warned = true; + const w = new Error('Possible EventEmitter memory leak detected. ' + + `${existing.length} ${String(type)} listeners ` + + 'added. Use emitter.setMaxListeners() to ' + + 'increase limit'); + w.name = 'MaxListenersExceededWarning'; + w.emitter = target; + w.type = type; + w.count = existing.length; + process.emitWarning(w); + } + } + } + + return target; +} + +EventEmitter.prototype.addListener = function addListener(type, listener) { + return _addListener(this, type, listener, false); +}; + +EventEmitter.prototype.on = EventEmitter.prototype.addListener; + +EventEmitter.prototype.prependListener = + function prependListener(type, listener) { + return _addListener(this, type, listener, true); + }; + +function onceWrapper() { + if (!this.fired) { + this.target.removeListener(this.type, this.wrapFn); + this.fired = true; + switch (arguments.length) { + case 0: + return this.listener.call(this.target); + case 1: + return this.listener.call(this.target, arguments[0]); + case 2: + return this.listener.call(this.target, arguments[0], arguments[1]); + case 3: + return this.listener.call(this.target, arguments[0], arguments[1], + arguments[2]); + default: + const args = new Array(arguments.length); + for (var i = 0; i < args.length; ++i) + args[i] = arguments[i]; + this.listener.apply(this.target, args); + } + } +} + +function _onceWrap(target, type, listener) { + var state = { fired: false, wrapFn: undefined, target, type, listener }; + var wrapped = onceWrapper.bind(state); + wrapped.listener = listener; + state.wrapFn = wrapped; + return wrapped; +} + +EventEmitter.prototype.once = function once(type, listener) { + if (typeof listener !== 'function') { + const err = + new TypeError('The "listener" argument must be of type function'); + err.code = 'ERR_INVALID_ARG_TYPE'; + throw err; + } + this.on(type, _onceWrap(this, type, listener)); + return this; +}; + +EventEmitter.prototype.prependOnceListener = + function prependOnceListener(type, listener) { + if (typeof listener !== 'function') { + const err = + new TypeError('The "listener" argument must be of type function'); + err.code = 'ERR_INVALID_ARG_TYPE'; + throw err; + } + this.prependListener(type, _onceWrap(this, type, listener)); + return this; + }; + +// Emits a 'removeListener' event if and only if the listener was removed. +EventEmitter.prototype.removeListener = + function removeListener(type, listener) { + var list, events, position, i, originalListener; + + if (typeof listener !== 'function') { + const err = + new TypeError('The "listener" argument must be of type function'); + err.code = 'ERR_INVALID_ARG_TYPE'; + throw err; + } + + events = this._events; + if (!events) + return this; + + list = events[type]; + if (!list) + return this; + + if (list === listener || list.listener === listener) { + if (--this._eventsCount === 0) + this._events = Object.create(null); + else { + delete events[type]; + if (events.removeListener) + this.emit('removeListener', type, list.listener || listener); + } + } else if (typeof list !== 'function') { + position = -1; + + for (i = list.length - 1; i >= 0; i--) { + if (list[i] === listener || list[i].listener === listener) { + originalListener = list[i].listener; + position = i; + break; + } + } + + if (position < 0) + return this; + + if (position === 0) + list.shift(); + else + spliceOne(list, position); + + if (list.length === 1) + events[type] = list[0]; + + if (events.removeListener) + this.emit('removeListener', type, originalListener || listener); + } + + return this; + }; + +EventEmitter.prototype.removeAllListeners = + function removeAllListeners(type) { + var listeners, events, i; + + events = this._events; + if (!events) + return this; + + // not listening for removeListener, no need to emit + if (!events.removeListener) { + if (arguments.length === 0) { + this._events = Object.create(null); + this._eventsCount = 0; + } else if (events[type]) { + if (--this._eventsCount === 0) + this._events = Object.create(null); + else + delete events[type]; + } + return this; + } + + // emit removeListener for all listeners on all events + if (arguments.length === 0) { + var keys = Object.keys(events); + var key; + for (i = 0; i < keys.length; ++i) { + key = keys[i]; + if (key === 'removeListener') continue; + this.removeAllListeners(key); + } + this.removeAllListeners('removeListener'); + this._events = Object.create(null); + this._eventsCount = 0; + return this; + } + + listeners = events[type]; + + if (typeof listeners === 'function') { + this.removeListener(type, listeners); + } else if (listeners) { + // LIFO order + for (i = listeners.length - 1; i >= 0; i--) { + this.removeListener(type, listeners[i]); + } + } + + return this; + }; + +EventEmitter.prototype.listeners = function listeners(type) { + var evlistener; + var ret; + var events = this._events; + + if (!events) + ret = []; + else { + evlistener = events[type]; + if (!evlistener) + ret = []; + else if (typeof evlistener === 'function') + ret = [evlistener.listener || evlistener]; + else + ret = unwrapListeners(evlistener); + } + + return ret; +}; + +EventEmitter.listenerCount = function(emitter, type) { + if (typeof emitter.listenerCount === 'function') { + return emitter.listenerCount(type); + } else { + return listenerCount.call(emitter, type); + } +}; + +EventEmitter.prototype.listenerCount = listenerCount; +function listenerCount(type) { + const events = this._events; + + if (events) { + const evlistener = events[type]; + + if (typeof evlistener === 'function') { + return 1; + } else if (evlistener) { + return evlistener.length; + } + } + + return 0; +} + +EventEmitter.prototype.eventNames = function eventNames() { + return this._eventsCount > 0 ? Reflect.ownKeys(this._events) : []; +}; + +// About 1.5x faster than the two-arg version of Array#splice(). +function spliceOne(list, index) { + for (var i = index, k = i + 1, n = list.length; k < n; i += 1, k += 1) + list[i] = list[k]; + list.pop(); +} + +function arrayClone(arr, n) { + var copy = new Array(n); + for (var i = 0; i < n; ++i) + copy[i] = arr[i]; + return copy; +} + +function unwrapListeners(arr) { + const ret = new Array(arr.length); + for (var i = 0; i < ret.length; ++i) { + ret[i] = arr[i].listener || arr[i]; + } + return ret; +} + +}); diff --git a/lib/extras/messaging.js b/lib/extras/messaging.js new file mode 100644 index 0000000000..cc8373890f --- /dev/null +++ b/lib/extras/messaging.js @@ -0,0 +1,82 @@ +/* eslint-disable indent, strict */ +(function(global, binding, v8) { + +'use strict'; + +const { defineProperties, setPrototypeOf } = global.Object; + +// A communication channel consisting of a handle (that wraps around an +// uv_async_t) which can receive information from other threads and emits +// .onmessage events, and a function used for sending data to a MessagePort +// in some other thread. +function onmessage(payload, flag) { + if (flag !== 0 /*MESSAGE_FLAG_NONE*/ && + flag < 100 /*MESSAGE_FLAG_CUSTOM_OFFSET*/) { + // This was not handled in C++, but it is also not a custom message in the + // sense that it was generated in JS, so some special handling is still + // required for deserialization. + // (This is primarily for error situations) + // debug(`[${threadId}] received raw message`, flag, payload); + return this.emit('flaggedMessage', flag, payload); + } + + // debug(`[${threadId}] received message`, flag, payload); + // Emit the flag and deserialized object to userland. + if (flag === 0 || flag === undefined) + this.emit('message', payload); + else + this.emit('flaggedMessage', flag, payload); +} + +function oninit() { + // Keep track of whether there are any workerMessage listeners: + // If there are some, ref() the channel so it keeps the event loop alive. + // If there are none or all are removed, unref() the channel so the worker + // can shutdown gracefully. + this.unref(); + this.on('newListener', (name) => { + if (name === 'message' && this.listenerCount('message') === 0) { + this.ref(); + this.start(); + } + }); + this.on('removeListener', (name) => { + if (name === 'message' && this.listenerCount('message') === 0) { + this.stop(); + this.unref(); + } + }); +} + +function onclose() { + this.emit('close'); +} + +function makeMessagePort(MessagePort) { + setPrototypeOf(MessagePort, binding.EventEmitter); + setPrototypeOf(MessagePort.prototype, binding.EventEmitter.prototype); + + defineProperties(MessagePort.prototype, { + onmessage: { + enumerable: true, + writable: false, + value: onmessage + }, + + oninit: { + enumerable: true, + writable: false, + value: oninit + }, + + onclose: { + enumerable: true, + writable: false, + value: onclose + } + }); +} + +binding.makeMessagePort = makeMessagePort; + +}); diff --git a/lib/internal/bootstrap_node.js b/lib/internal/bootstrap_node.js index 4ebc81c488..edcc3eb4f0 100644 --- a/lib/internal/bootstrap_node.js +++ b/lib/internal/bootstrap_node.js @@ -11,6 +11,10 @@ let internalBinding; let isMainThread; + // Add process to global first as it may be used by native modules loaded + // later on. + global.process = process; + function startup() { const EventEmitter = NativeModule.require('events'); process._eventsCount = 0; @@ -265,7 +269,6 @@ enumerable: false, configurable: true }); - global.process = process; const util = NativeModule.require('util'); function makeGetter(name) { diff --git a/lib/internal/error-serdes.js b/lib/internal/error-serdes.js new file mode 100644 index 0000000000..afedc3ae58 --- /dev/null +++ b/lib/internal/error-serdes.js @@ -0,0 +1,118 @@ +'use strict'; + +const Buffer = require('buffer').Buffer; +const { serialize, deserialize } = require('v8'); +const { SafeSet } = require('internal/safe_globals'); + +const kSerializedError = 0; +const kSerializedObject = 1; +const kInspectedError = 2; + +const GetPrototypeOf = Object.getPrototypeOf; +const GetOwnPropertyDescriptor = Object.getOwnPropertyDescriptor; +const GetOwnPropertyNames = Object.getOwnPropertyNames; +const DefineProperty = Object.defineProperty; +const Assign = Object.assign; +const ObjectPrototypeToString = + Function.prototype.call.bind(Object.prototype.toString); +const ForEach = Function.prototype.call.bind(Array.prototype.forEach); +const Call = Function.prototype.call.bind(Function.prototype.call); + +const errors = { + Error, TypeError, RangeError, URIError, SyntaxError, ReferenceError, EvalError +}; +const errorConstructorNames = new SafeSet(Object.keys(errors)); + +function TryGetAllProperties(object, target = object) { + const all = Object.create(null); + if (object === null) + return all; + Assign(all, TryGetAllProperties(GetPrototypeOf(object), target)); + const keys = GetOwnPropertyNames(object); + ForEach(keys, (key) => { + const descriptor = GetOwnPropertyDescriptor(object, key); + const getter = descriptor.get; + if (getter && key !== '__proto__') { + try { + descriptor.value = Call(getter, target); + } catch (e) {} + } + if ('value' in descriptor && typeof descriptor.value !== 'function') { + delete descriptor.get; + delete descriptor.set; + all[key] = descriptor; + } + }); + return all; +} + +function GetConstructors(object) { + const constructors = []; + + for (var current = object; + current !== null; + current = GetPrototypeOf(current)) { + const desc = GetOwnPropertyDescriptor(current, 'constructor'); + if (desc && desc.value) { + DefineProperty(constructors, constructors.length, { + value: desc.value, enumerable: true + }); + } + } + + return constructors; +} + +function GetName(object) { + const desc = GetOwnPropertyDescriptor(object, 'name'); + return desc && desc.value; +} + +let util; +function lazyUtil() { + if (!util) + util = require('util'); + return util; +} + +function serializeError(error) { + try { + if (typeof error === 'object' && + ObjectPrototypeToString(error) === '[object Error]') { + const constructors = GetConstructors(error); + for (var i = constructors.length - 1; i >= 0; i--) { + const name = GetName(constructors[i]); + if (errorConstructorNames.has(name)) { + try { error.stack; } catch (e) {} + const serialized = serialize({ + constructor: name, + properties: TryGetAllProperties(error) + }); + return Buffer.concat([Buffer.from([kSerializedError]), serialized]); + } + } + } + } catch (e) {} + try { + const serialized = serialize(error); + return Buffer.concat([Buffer.from([kSerializedObject]), serialized]); + } catch (e) {} + return Buffer.concat([Buffer.from([kInspectedError]), + Buffer.from(lazyUtil().inspect(error), 'utf8')]); +} + +function deserializeError(error) { + switch (error[0]) { + case kSerializedError: + const { constructor, properties } = deserialize(error.slice(1)); + const ctor = errors[constructor]; + return Object.create(ctor.prototype, properties); + case kSerializedObject: + return deserialize(error.slice(1)); + case kInspectedError: + return error.toString('utf8', 1); + } + require('assert').fail('This should not happen'); +} + +module.exports = { serializeError, deserializeError }; diff --git a/lib/internal/errors.js b/lib/internal/errors.js index ec8f7c1885..7a5fab2cd0 100755 --- a/lib/internal/errors.js +++ b/lib/internal/errors.js @@ -357,6 +357,8 @@ E('ERR_WORKER_NEED_ABSOLUTE_PATH', E('ERR_WORKER_OUT_OF_MEMORY', 'The worker script ran out of memory'); E('ERR_WORKER_UNSERIALIZABLE_ERROR', 'Serializing an uncaught exception failed'); +E('ERR_WORKER_UNSUPPORTED_EXTENSION', + 'The worker script extension must be ".js" or ".mjs". Received "%s"'); E('ERR_ZLIB_BINDING_CLOSED', 'zlib binding closed'); function invalidArgType(name, expected, actual) { diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 86530346f5..2ea824b6c1 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -1,6 +1,5 @@ 'use strict'; -const Buffer = require('buffer').Buffer; const EventEmitter = require('events'); const assert = require('assert'); const path = require('path'); @@ -8,6 +7,8 @@ const util = require('util'); const errors = require('internal/errors'); const { MessagePort, MessageChannel } = internalBinding('messaging'); +const { serializeError, deserializeError } = require('internal/error-serdes'); + util.inherits(MessagePort, EventEmitter); const { @@ -41,65 +42,8 @@ const debug = util.debuglog('worker'); const kUpAndRunning = MESSAGE_FLAG_CUSTOM_OFFSET; const kLoadScript = MESSAGE_FLAG_CUSTOM_OFFSET + 1; -// A communication channel consisting of a handle (that wraps around an -// uv_async_t) which can receive information from other threads and emits -// .onmessage events, and a function used for sending data to a MessagePort -// in some other thread. -function onmessage(payload, flag) { - if (flag !== MESSAGE_FLAG_NONE && flag < MESSAGE_FLAG_CUSTOM_OFFSET) { - // This was not handled in C++, but it is also not a custom message in the - // sense that it was generated in JS, so some special handling is still - // required for deserialization. - // (This is primarily for error situations) - debug(`[${threadId}] received raw message`, flag, payload); - return this.emit('flaggedMessage', flag, payload); - } - - debug(`[${threadId}] received message`, flag, payload); - // Emit the flag and deserialized object to userland. - if (flag === 0 || flag === undefined) - this.emit('message', payload); - else - this.emit('flaggedMessage', flag, payload); -} - -Object.defineProperty(MessagePort.prototype, 'onmessage', { - enumerable: true, - configurable: true, - get() { return onmessage; }, - set(value) { - Object.defineProperty(this, { - writable: true, - enumerable: true, - configurable: true, - value - }); - this.ref(); - this.start(); - } -}); - -function oninit() { - setupPortReferencing(this, this, 'message'); -} - -Object.defineProperty(MessagePort.prototype, 'oninit', { - enumerable: true, - writable: false, - value: oninit -}); - -function onclose() { - this.emit('close'); -} - -Object.defineProperty(MessagePort.prototype, 'onclose', { - enumerable: true, - writable: false, - value: onclose -}); - function setupPortReferencing(port, eventEmitter, eventName) { + // TODO(addaleax): Merge with oninit() in lib/extras/messaging.js // Keep track of whether there are any workerMessage listeners: // If there are some, ref() the channel so it keeps the event loop alive. // If there are none or all are removed, unref() the channel so the worker @@ -129,8 +73,14 @@ class Worker extends EventEmitter { 'string', filename); } - if (!options.eval && !path.isAbsolute(filename)) { - throw new errors.TypeError('ERR_WORKER_NEED_ABSOLUTE_PATH', filename); + if (!options.eval) { + if (!path.isAbsolute(filename)) { + throw new errors.TypeError('ERR_WORKER_NEED_ABSOLUTE_PATH', filename); + } + const ext = path.extname(filename); + if (ext !== '.js' && ext !== '.mjs') { + throw new errors.TypeError('ERR_WORKER_UNSUPPORTED_EXTENSION', ext); + } } const resourceLimits = { @@ -271,14 +221,6 @@ function setupChild(evalScript) { port.start(); } -// TODO(addaleax): These can be improved a lot. -function serializeError(error) { - return Buffer.from(util.inspect(error), 'utf8'); -} - -function deserializeError(error) { - return error.toString('utf8'); -} module.exports = { MessagePort, diff --git a/lib/vm.js b/lib/vm.js index e7fccc9749..f4a5d7443b 100644 --- a/lib/vm.js +++ b/lib/vm.js @@ -30,6 +30,8 @@ const { runInDebugContext } = process.binding('contextify'); +const { moveMessagePortToContext } = internalBinding('messaging'); + // The binding provides a few useful primitives: // - Script(code, { filename = "evalmachine.anonymous", // displayErrors = true } = {}) @@ -143,6 +145,7 @@ module.exports = { Script, createContext, createScript, + moveMessagePortToContext, runInDebugContext, runInContext, runInNewContext, diff --git a/node.gyp b/node.gyp index 76423c9554..b979d86d4d 100644 --- a/node.gyp +++ b/node.gyp @@ -95,6 +95,7 @@ 'lib/internal/crypto/util.js', 'lib/internal/encoding.js', 'lib/internal/errors.js', + 'lib/internal/error-serdes.js', 'lib/internal/freelist.js', 'lib/internal/fs.js', 'lib/internal/http.js', @@ -206,6 +207,7 @@ 'src/node_constants.cc', 'src/node_contextify.cc', 'src/node_debug_options.cc', + 'src/node_extras.cc', 'src/node_file.cc', 'src/node_http2.cc', 'src/node_http_parser.cc', @@ -259,6 +261,7 @@ 'src/node_http2_core-inl.h', 'src/node_buffer.h', 'src/node_constants.h', + 'src/node_contextify.h', 'src/node_debug_options.h', 'src/node_http2.h', 'src/node_http2_state.h', @@ -689,10 +692,12 @@ '<(OBJ_PATH)<(OBJ_SEPARATOR)handle_wrap.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_buffer.<(OBJ_SUFFIX)', + '<(OBJ_PATH)<(OBJ_SEPARATOR)node_contextify.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_i18n.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_messaging.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_perf.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_url.<(OBJ_SUFFIX)', + '<(OBJ_PATH)<(OBJ_SEPARATOR)node_watchdog.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)node_worker.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)util.<(OBJ_SUFFIX)', '<(OBJ_PATH)<(OBJ_SEPARATOR)sharedarraybuffer-metadata.<(OBJ_SUFFIX)', diff --git a/src/env.h b/src/env.h index cc8f5a6303..d980122dda 100644 --- a/src/env.h +++ b/src/env.h @@ -95,6 +95,7 @@ class Worker; V(contextify_context_private_symbol, "node:contextify:context") \ V(contextify_global_private_symbol, "node:contextify:global") \ V(decorated_private_symbol, "node:decorated") \ + V(messageport_initialized_private_symbol, "node:messagePortInitialized") \ V(npn_buffer_private_symbol, "node:npnBuffer") \ V(processed_private_symbol, "node:processed") \ V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLifetimePartner") \ diff --git a/src/node.cc b/src/node.cc index 640fbdf8a8..c0b67d6e15 100644 --- a/src/node.cc +++ b/src/node.cc @@ -2663,11 +2663,28 @@ node_module* get_linked_module(const char* name) { return FindModule(modlist_linked, name, NM_F_LINKED); } -struct DLib { +namespace { + +Mutex dlib_mutex; + +struct DLib; + +std::unordered_map