Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

Commit

Permalink
Make MessagePort an EventEmitter everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
TimothyGu committed Sep 20, 2017
1 parent bb85e11 commit 62a0263
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 68 deletions.
3 changes: 2 additions & 1 deletion common.gypi
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
'v8_enable_disassembler': 1,

'v8_extra_library_files': [
'./lib/extras/events.js'
'./lib/extras/events.js',
'./lib/extras/messaging.js'
],

# Don't bake anything extra into the snapshot.
Expand Down
3 changes: 0 additions & 3 deletions doc/api/vm.md
Original file line number Diff line number Diff line change
Expand Up @@ -325,9 +325,6 @@ object, whose prototype and methods act as if they were created in the passed
context. The received messages will also be emitted as objects from the passed
context.

Note that the return instance is *not* an `EventEmitter`; for receiving
messages, the `.onmessage` property can be used.

The `port` object on which this method was called can not be used for sending
or receiving further messages.

Expand Down
76 changes: 76 additions & 0 deletions lib/extras/messaging.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
(function(global, binding, v8) {

'use strict';

const { defineProperty, setPrototypeOf } = global.Object;

// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
function onmessage(payload, flag) {
if (flag !== 0 /*MESSAGE_FLAG_NONE*/ && flag < 100 /*MESSAGE_FLAG_CUSTOM_OFFSET*/) {
// This was not handled in C++, but it is also not a custom message in the
// sense that it was generated in JS, so some special handling is still
// required for deserialization.
// (This is primarily for error situations)
// debug(`[${threadId}] received raw message`, flag, payload);
return this.emit('flaggedMessage', flag, payload);
}

// debug(`[${threadId}] received message`, flag, payload);
// Emit the flag and deserialized object to userland.
if (flag === 0 || flag === undefined)
this.emit('message', payload);
else
this.emit('flaggedMessage', flag, payload);
}

function oninit() {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
this.unref();
this.on('newListener', (name) => {
if (name === 'message' && this.listenerCount('message') === 0) {
this.ref();
this.start();
}
});
this.on('removeListener', (name) => {
if (name === 'message' && this.listenerCount('message') === 0) {
this.unref();
}
});
}

function onclose() {
this.emit('close');
}

function makeMessagePort(MessagePort) {
setPrototypeOf(MessagePort.prototype, binding.EventEmitter.prototype);

This comment has been minimized.

Copy link
@addaleax

addaleax Sep 20, 2017

Contributor

I think there should be a second prototype chain for the constructors as well (we can choose to not do that, but it’s what ES classes do)


defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: false,
writable: false,
value: onmessage
});

defineProperty(MessagePort.prototype, 'oninit', {
enumerable: false,
writable: false,
value: oninit
});

defineProperty(MessagePort.prototype, 'onclose', {
enumerable: false,
writable: false,
value: onclose
});

This comment has been minimized.

Copy link
@addaleax

addaleax Sep 20, 2017

Contributor

Any reason not to use defineProperties for bulk setting? :)

}

binding.makeMessagePort = makeMessagePort;

});
58 changes: 0 additions & 58 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -41,64 +41,6 @@ const debug = util.debuglog('worker');
const kUpAndRunning = MESSAGE_FLAG_CUSTOM_OFFSET;
const kLoadScript = MESSAGE_FLAG_CUSTOM_OFFSET + 1;

// A communication channel consisting of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
function onmessage(payload, flag) {
if (flag !== MESSAGE_FLAG_NONE && flag < MESSAGE_FLAG_CUSTOM_OFFSET) {
// This was not handled in C++, but it is also not a custom message in the
// sense that it was generated in JS, so some special handling is still
// required for deserialization.
// (This is primarily for error situations)
debug(`[${threadId}] received raw message`, flag, payload);
return this.emit('flaggedMessage', flag, payload);
}

debug(`[${threadId}] received message`, flag, payload);
// Emit the flag and deserialized object to userland.
if (flag === 0 || flag === undefined)
this.emit('message', payload);
else
this.emit('flaggedMessage', flag, payload);
}

Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: false,
configurable: true,
get() { return onmessage; },
set(value) {
Object.defineProperty(this, {
writable: true,
enumerable: true,
configurable: true,
value
});
this.ref();
this.start();
}
});

function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: false,
writable: false,
value: oninit
});

function onclose() {
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, 'onclose', {
enumerable: false,
writable: false,
value: onclose
});

function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
Expand Down
1 change: 1 addition & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class Worker;
V(contextify_global_private_symbol, "node:contextify:global") \
V(inspector_delegate_private_symbol, "node:inspector:delegate") \
V(decorated_private_symbol, "node:decorated") \
V(messageport_initialized_private_symbol, "node:messagePortInitialized") \
V(npn_buffer_private_symbol, "node:npnBuffer") \
V(processed_private_symbol, "node:processed") \
V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLiftimePartner") \
Expand Down
28 changes: 26 additions & 2 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::Private;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Undefined;
using v8::Value;
using v8::ValueDeserializer;
using v8::ValueSerializer;
Expand Down Expand Up @@ -754,8 +756,30 @@ MaybeLocal<Function> GetMessagePortConstructor(
// of code, because it is needed early on in the child environment setup.
Local<FunctionTemplate> templ;
templ = env->message_port_constructor_template();
if (!templ.IsEmpty())
return templ->GetFunction(context);
if (!templ.IsEmpty()) {
auto maybe_ctor = templ->GetFunction(context);
Local<Function> ctor;
if (!maybe_ctor.ToLocal(&ctor)) return maybe_ctor;

// Set up EventEmitter inheritance and some default listners.
Local<Private> initialized = env->messageport_initialized_private_symbol();
if (!ctor->HasPrivate(context, initialized).FromJust()) {
Local<Object> extras = context->GetExtrasBindingObject();
Local<Function> make_message_port =
extras->Get(context,
FIXED_ONE_BYTE_STRING(env->isolate(), "makeMessagePort"))
.ToLocalChecked().As<Function>();
CHECK(make_message_port->IsFunction());

This comment has been minimized.

Copy link
@addaleax

addaleax Sep 20, 2017

Contributor

By the way, you’d need to check this before the .As<Function>() call :) I think V8 blows up in debug mode otherwise

Local<Value> args[] = { ctor };
make_message_port->Call(context, Undefined(env->isolate()),
arraysize(args), args).ToLocalChecked();

This comment has been minimized.

Copy link
@addaleax

addaleax Sep 20, 2017

Contributor

I think I’ll make the TromJust()/ToLocalChecked() calls a bit more defensive, I think they could still throw (e.g. when EventEmitter.prototype is turned into a setter that throws?


ctor->SetPrivate(context, initialized, Undefined(env->isolate()))
.FromJust();
}

return maybe_ctor;
}

{
Local<FunctionTemplate> m = env->NewFunctionTemplate(MessagePort::New);
Expand Down
7 changes: 3 additions & 4 deletions test/parallel/test-message-channel-move.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,12 @@ const { MessageChannel } = require('worker');

{
assert(port instanceof Object);
assert(port.onmessage === undefined);
assert(port.onmessage instanceof Function);
assert(port.postMessage instanceof Function);
port.onmessage = function(msg) {
port.on('message', (msg) => {
assert(msg instanceof Object);
port.postMessage(msg);
};
port.start();
});
}

{
Expand Down

0 comments on commit 62a0263

Please sign in to comment.