From 749a13b76c351d515ed489844ece575b8918d2ed Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sat, 7 Oct 2017 14:39:02 -0700 Subject: [PATCH] worker: support MessagePort passing in messages Support passing `MessagePort` instances through other `MessagePort`s, as expected by the `MessagePort` spec. Thanks to Stephen Belanger for reviewing this change in its original PR. Refs: https://github.com/ayojs/ayo/pull/106 PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil Reviewed-By: Benjamin Gruenbaum Reviewed-By: Shingo Inoue Reviewed-By: Matteo Collina Reviewed-By: Tiancheng "Timothy" Gu Reviewed-By: John-David Dalton Reviewed-By: Gus Caplan --- doc/api/errors.md | 12 +++ doc/api/worker.md | 2 +- src/node_errors.h | 5 ++ src/node_messaging.cc | 80 ++++++++++++++++++- src/node_messaging.h | 5 ++ ...-message-port-message-port-transferring.js | 23 ++++++ 6 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 test/parallel/test-message-port-message-port-transferring.js diff --git a/doc/api/errors.md b/doc/api/errors.md index f3e5939f258576..0a49757da3e39e 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -629,6 +629,12 @@ An operation outside the bounds of a `Buffer` was attempted. An attempt has been made to create a `Buffer` larger than the maximum allowed size. + +### ERR_CANNOT_TRANSFER_OBJECT + +The value passed to `postMessage()` contained an object that is not supported +for transferring. + ### ERR_CANNOT_WATCH_SIGINT @@ -1294,6 +1300,12 @@ strict compliance with the API specification (which in some cases may accept `func(undefined)` and `func()` are treated identically, and the [`ERR_INVALID_ARG_TYPE`][] error code may be used instead. + +### ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST + +A `MessagePort` was found in the object passed to a `postMessage()` call, +but not provided in the `transferList` for that call. + ### ERR_MISSING_MODULE diff --git a/doc/api/worker.md b/doc/api/worker.md index 4724714cd62f26..6a391c5a9e3b19 100644 --- a/doc/api/worker.md +++ b/doc/api/worker.md @@ -83,7 +83,7 @@ the [HTML structured clone algorithm][]. In particular, it may contain circular references and objects like typed arrays that the `JSON` API is not able to stringify. -`transferList` may be a list of `ArrayBuffer` objects. +`transferList` may be a list of `ArrayBuffer` and `MessagePort` objects. After transferring, they will not be usable on the sending side of the channel anymore (even if they are not contained in `value`). diff --git a/src/node_errors.h b/src/node_errors.h index 81169d241bc226..9cadb0e18533ca 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -23,6 +23,7 @@ namespace node { #define ERRORS_WITH_CODE(V) \ V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \ V(ERR_BUFFER_TOO_LARGE, Error) \ + V(ERR_CANNOT_TRANSFER_OBJECT, TypeError) \ V(ERR_CLOSED_MESSAGE_PORT, Error) \ V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \ V(ERR_INDEX_OUT_OF_RANGE, RangeError) \ @@ -31,6 +32,7 @@ namespace node { V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \ V(ERR_MEMORY_ALLOCATION_FAILED, Error) \ V(ERR_MISSING_ARGS, TypeError) \ + V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \ V(ERR_MISSING_MODULE, Error) \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \ V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \ @@ -57,11 +59,14 @@ namespace node { // Errors with predefined static messages #define PREDEFINED_ERROR_MESSAGES(V) \ + V(ERR_CANNOT_TRANSFER_OBJECT, "Cannot transfer object of unsupported type")\ V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \ V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \ V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \ V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \ V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ + V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \ + "MessagePort was found in message but not listed in transferList") \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \ "Script execution was interrupted by `SIGINT`") diff --git a/src/node_messaging.cc b/src/node_messaging.cc index c6e701c7d94426..1c6551e0969f3e 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -41,14 +41,27 @@ namespace { // `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them. class DeserializerDelegate : public ValueDeserializer::Delegate { public: - DeserializerDelegate(Message* m, Environment* env) - : env_(env), msg_(m) {} + DeserializerDelegate(Message* m, + Environment* env, + const std::vector& message_ports) + : env_(env), msg_(m), message_ports_(message_ports) {} + + MaybeLocal ReadHostObject(Isolate* isolate) override { + // Currently, only MessagePort hosts objects are supported, so identifying + // by the index in the message's MessagePort array is sufficient. + uint32_t id; + if (!deserializer->ReadUint32(&id)) + return MaybeLocal(); + CHECK_LE(id, message_ports_.size()); + return message_ports_[id]->object(); + }; ValueDeserializer* deserializer = nullptr; private: Environment* env_; Message* msg_; + const std::vector& message_ports_; }; } // anonymous namespace @@ -58,7 +71,23 @@ MaybeLocal Message::Deserialize(Environment* env, EscapableHandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); - DeserializerDelegate delegate(this, env); + // Create all necessary MessagePort handles. + std::vector ports(message_ports_.size()); + for (uint32_t i = 0; i < message_ports_.size(); ++i) { + ports[i] = MessagePort::New(env, + context, + std::move(message_ports_[i])); + if (ports[i] == nullptr) { + for (MessagePort* port : ports) { + // This will eventually release the MessagePort object itself. + port->Close(); + } + return MaybeLocal(); + } + } + message_ports_.clear(); + + DeserializerDelegate delegate(this, env, ports); ValueDeserializer deserializer( env->isolate(), reinterpret_cast(main_message_buf_.data), @@ -83,6 +112,10 @@ MaybeLocal Message::Deserialize(Environment* env, deserializer.ReadValue(context).FromMaybe(Local())); } +void Message::AddMessagePort(std::unique_ptr&& data) { + message_ports_.emplace_back(std::move(data)); +} + namespace { // This tells V8 how to serialize objects that it does not understand @@ -97,12 +130,43 @@ class SerializerDelegate : public ValueSerializer::Delegate { env_->isolate()->ThrowException(Exception::Error(message)); } + Maybe WriteHostObject(Isolate* isolate, Local object) override { + if (env_->message_port_constructor_template()->HasInstance(object)) { + return WriteMessagePort(Unwrap(object)); + } + + THROW_ERR_CANNOT_TRANSFER_OBJECT(env_); + return Nothing(); + } + + void Finish() { + // Only close the MessagePort handles and actually transfer them + // once we know that serialization succeeded. + for (MessagePort* port : ports_) { + port->Close(); + msg_->AddMessagePort(port->Detach()); + } + } + ValueSerializer* serializer = nullptr; private: + Maybe WriteMessagePort(MessagePort* port) { + for (uint32_t i = 0; i < ports_.size(); i++) { + if (ports_[i] == port) { + serializer->WriteUint32(i); + return Just(true); + } + } + + THROW_ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST(env_); + return Nothing(); + } + Environment* env_; Local context_; Message* msg_; + std::vector ports_; friend class worker::Message; }; @@ -131,7 +195,7 @@ Maybe Message::Serialize(Environment* env, Local entry; if (!transfer_list->Get(context, i).ToLocal(&entry)) return Nothing(); - // Currently, we support ArrayBuffers. + // Currently, we support ArrayBuffers and MessagePorts. if (entry->IsArrayBuffer()) { Local ab = entry.As(); // If we cannot render the ArrayBuffer unusable in this Isolate and @@ -144,6 +208,12 @@ Maybe Message::Serialize(Environment* env, array_buffers.push_back(ab); serializer.TransferArrayBuffer(id, ab); continue; + } else if (env->message_port_constructor_template() + ->HasInstance(entry)) { + MessagePort* port = Unwrap(entry.As()); + CHECK_NE(port, nullptr); + delegate.ports_.push_back(port); + continue; } THROW_ERR_INVALID_TRANSFER_OBJECT(env); @@ -167,6 +237,8 @@ Maybe Message::Serialize(Environment* env, contents.ByteLength() }); } + delegate.Finish(); + // The serializer gave us a buffer allocated using `malloc()`. std::pair data = serializer.Release(); main_message_buf_ = diff --git a/src/node_messaging.h b/src/node_messaging.h index 7bd60163ea167c..074267bb67c2dd 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -37,9 +37,14 @@ class Message { v8::Local input, v8::Local transfer_list); + // Internal method of Message that is called once serialization finishes + // and that transfers ownership of `data` to this message. + void AddMessagePort(std::unique_ptr&& data); + private: MallocedBuffer main_message_buf_; std::vector> array_buffer_contents_; + std::vector> message_ports_; friend class MessagePort; }; diff --git a/test/parallel/test-message-port-message-port-transferring.js b/test/parallel/test-message-port-message-port-transferring.js new file mode 100644 index 00000000000000..a7490b3678ac74 --- /dev/null +++ b/test/parallel/test-message-port-message-port-transferring.js @@ -0,0 +1,23 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel } = require('worker'); + +{ + const { port1: basePort1, port2: basePort2 } = new MessageChannel(); + const { + port1: transferredPort1, port2: transferredPort2 + } = new MessageChannel(); + + basePort1.postMessage({ transferredPort1 }, [ transferredPort1 ]); + basePort2.on('message', common.mustCall(({ transferredPort1 }) => { + transferredPort1.postMessage('foobar'); + transferredPort2.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'foobar'); + transferredPort1.close(common.mustCall()); + basePort1.close(common.mustCall()); + })); + })); +}