diff --git a/common.gypi b/common.gypi index 1b35aaf641..5947af16af 100644 --- a/common.gypi +++ b/common.gypi @@ -29,7 +29,8 @@ 'v8_enable_disassembler': 1, 'v8_extra_library_files': [ - './lib/extras/events.js' + './lib/extras/events.js', + './lib/extras/messaging.js' ], # Don't bake anything extra into the snapshot. diff --git a/doc/api/vm.md b/doc/api/vm.md index bbc2db933e..500cb799ea 100644 --- a/doc/api/vm.md +++ b/doc/api/vm.md @@ -325,9 +325,6 @@ object, whose prototype and methods act as if they were created in the passed context. The received messages will also be emitted as objects from the passed context. -Note that the return instance is *not* an `EventEmitter`; for receiving -messages, the `.onmessage` property can be used. - The `port` object on which this method was called can not be used for sending or receiving further messages. diff --git a/lib/extras/messaging.js b/lib/extras/messaging.js new file mode 100644 index 0000000000..44b8aa0d78 --- /dev/null +++ b/lib/extras/messaging.js @@ -0,0 +1,76 @@ +(function(global, binding, v8) { + +'use strict'; + +const { defineProperty, setPrototypeOf } = global.Object; + +// A communication channel consisting of a handle (that wraps around an +// uv_async_t) which can receive information from other threads and emits +// .onmessage events, and a function used for sending data to a MessagePort +// in some other thread. +function onmessage(payload, flag) { + if (flag !== 0 /*MESSAGE_FLAG_NONE*/ && flag < 100 /*MESSAGE_FLAG_CUSTOM_OFFSET*/) { + // This was not handled in C++, but it is also not a custom message in the + // sense that it was generated in JS, so some special handling is still + // required for deserialization. + // (This is primarily for error situations) + // debug(`[${threadId}] received raw message`, flag, payload); + return this.emit('flaggedMessage', flag, payload); + } + + // debug(`[${threadId}] received message`, flag, payload); + // Emit the flag and deserialized object to userland. + if (flag === 0 || flag === undefined) + this.emit('message', payload); + else + this.emit('flaggedMessage', flag, payload); +} + +function oninit() { + // Keep track of whether there are any workerMessage listeners: + // If there are some, ref() the channel so it keeps the event loop alive. + // If there are none or all are removed, unref() the channel so the worker + // can shutdown gracefully. + this.unref(); + this.on('newListener', (name) => { + if (name === 'message' && this.listenerCount('message') === 0) { + this.ref(); + this.start(); + } + }); + this.on('removeListener', (name) => { + if (name === 'message' && this.listenerCount('message') === 0) { + this.unref(); + } + }); +} + +function onclose() { + this.emit('close'); +} + +function makeMessagePort(MessagePort) { + setPrototypeOf(MessagePort.prototype, binding.EventEmitter.prototype); + + defineProperty(MessagePort.prototype, 'onmessage', { + enumerable: false, + writable: false, + value: onmessage + }); + + defineProperty(MessagePort.prototype, 'oninit', { + enumerable: false, + writable: false, + value: oninit + }); + + defineProperty(MessagePort.prototype, 'onclose', { + enumerable: false, + writable: false, + value: onclose + }); +} + +binding.makeMessagePort = makeMessagePort; + +}); diff --git a/lib/internal/worker.js b/lib/internal/worker.js index 006f4f6174..ad800d6173 100644 --- a/lib/internal/worker.js +++ b/lib/internal/worker.js @@ -41,64 +41,6 @@ const debug = util.debuglog('worker'); const kUpAndRunning = MESSAGE_FLAG_CUSTOM_OFFSET; const kLoadScript = MESSAGE_FLAG_CUSTOM_OFFSET + 1; -// A communication channel consisting of a handle (that wraps around an -// uv_async_t) which can receive information from other threads and emits -// .onmessage events, and a function used for sending data to a MessagePort -// in some other thread. -function onmessage(payload, flag) { - if (flag !== MESSAGE_FLAG_NONE && flag < MESSAGE_FLAG_CUSTOM_OFFSET) { - // This was not handled in C++, but it is also not a custom message in the - // sense that it was generated in JS, so some special handling is still - // required for deserialization. - // (This is primarily for error situations) - debug(`[${threadId}] received raw message`, flag, payload); - return this.emit('flaggedMessage', flag, payload); - } - - debug(`[${threadId}] received message`, flag, payload); - // Emit the flag and deserialized object to userland. - if (flag === 0 || flag === undefined) - this.emit('message', payload); - else - this.emit('flaggedMessage', flag, payload); -} - -Object.defineProperty(MessagePort.prototype, 'onmessage', { - enumerable: false, - configurable: true, - get() { return onmessage; }, - set(value) { - Object.defineProperty(this, { - writable: true, - enumerable: true, - configurable: true, - value - }); - this.ref(); - this.start(); - } -}); - -function oninit() { - setupPortReferencing(this, this, 'message'); -} - -Object.defineProperty(MessagePort.prototype, 'oninit', { - enumerable: false, - writable: false, - value: oninit -}); - -function onclose() { - this.emit('close'); -} - -Object.defineProperty(MessagePort.prototype, 'onclose', { - enumerable: false, - writable: false, - value: onclose -}); - function setupPortReferencing(port, eventEmitter, eventName) { // Keep track of whether there are any workerMessage listeners: // If there are some, ref() the channel so it keeps the event loop alive. diff --git a/src/env.h b/src/env.h index 90db700daf..bda4d49164 100644 --- a/src/env.h +++ b/src/env.h @@ -92,6 +92,7 @@ class Worker; V(contextify_global_private_symbol, "node:contextify:global") \ V(inspector_delegate_private_symbol, "node:inspector:delegate") \ V(decorated_private_symbol, "node:decorated") \ + V(messageport_initialized_private_symbol, "node:messagePortInitialized") \ V(npn_buffer_private_symbol, "node:npnBuffer") \ V(processed_private_symbol, "node:processed") \ V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLiftimePartner") \ diff --git a/src/node_messaging.cc b/src/node_messaging.cc index a125061af5..4f779c2d8d 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -26,8 +26,10 @@ using v8::Maybe; using v8::MaybeLocal; using v8::Nothing; using v8::Object; +using v8::Private; using v8::SharedArrayBuffer; using v8::String; +using v8::Undefined; using v8::Value; using v8::ValueDeserializer; using v8::ValueSerializer; @@ -754,8 +756,30 @@ MaybeLocal GetMessagePortConstructor( // of code, because it is needed early on in the child environment setup. Local templ; templ = env->message_port_constructor_template(); - if (!templ.IsEmpty()) - return templ->GetFunction(context); + if (!templ.IsEmpty()) { + auto maybe_ctor = templ->GetFunction(context); + Local ctor; + if (!maybe_ctor.ToLocal(&ctor)) return maybe_ctor; + + // Set up EventEmitter inheritance and some default listners. + Local initialized = env->messageport_initialized_private_symbol(); + if (!ctor->HasPrivate(context, initialized).FromJust()) { + Local extras = context->GetExtrasBindingObject(); + Local make_message_port = + extras->Get(context, + FIXED_ONE_BYTE_STRING(env->isolate(), "makeMessagePort")) + .ToLocalChecked().As(); + CHECK(make_message_port->IsFunction()); + Local args[] = { ctor }; + make_message_port->Call(context, Undefined(env->isolate()), + arraysize(args), args).ToLocalChecked(); + + ctor->SetPrivate(context, initialized, Undefined(env->isolate())) + .FromJust(); + } + + return maybe_ctor; + } { Local m = env->NewFunctionTemplate(MessagePort::New); diff --git a/test/parallel/test-message-channel-move.js b/test/parallel/test-message-channel-move.js index ba140dc064..abbba99df1 100644 --- a/test/parallel/test-message-channel-move.js +++ b/test/parallel/test-message-channel-move.js @@ -16,13 +16,12 @@ const { MessageChannel } = require('worker'); { assert(port instanceof Object); - assert(port.onmessage === undefined); + assert(port.onmessage instanceof Function); assert(port.postMessage instanceof Function); - port.onmessage = function(msg) { + port.on('message', (msg) => { assert(msg instanceof Object); port.postMessage(msg); - }; - port.start(); + }); } {