Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

worker: implement SharedArrayBuffer/MessagePort transferring #106

Merged
merged 2 commits into from
Oct 16, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
'src/node_i18n.cc',
'src/pipe_wrap.cc',
'src/process_wrap.cc',
'src/sharedarraybuffer-metadata.cc',
'src/signal_wrap.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
Expand Down Expand Up @@ -279,6 +280,7 @@
'src/udp_wrap.h',
'src/req-wrap.h',
'src/req-wrap-inl.h',
'src/sharedarraybuffer-metadata.h',
'src/string_bytes.h',
'src/stream_base.h',
'src/stream_base-inl.h',
Expand Down
2 changes: 2 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ class ModuleWrap;
V(decorated_private_symbol, "node:decorated") \
V(npn_buffer_private_symbol, "node:npnBuffer") \
V(processed_private_symbol, "node:processed") \
V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLifetimePartner") \
V(selected_npn_buffer_private_symbol, "node:selectedNpnBuffer") \
V(domain_private_symbol, "node:domain") \

Expand Down Expand Up @@ -324,6 +325,7 @@ class ModuleWrap;
V(promise_wrap_template, v8::ObjectTemplate) \
V(push_values_to_array_function, v8::Function) \
V(randombytes_constructor_template, v8::ObjectTemplate) \
V(sab_lifetimepartner_constructor_template, v8::FunctionTemplate) \
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
Expand Down
112 changes: 108 additions & 4 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Value;
using v8::ValueDeserializer;
Expand Down Expand Up @@ -55,6 +56,8 @@ Message& Message::operator=(Message&& other) {
main_message_buf_ = other.main_message_buf_;
other.main_message_buf_ = uv_buf_init(nullptr, 0);
array_buffer_contents_ = std::move(other.array_buffer_contents_);
shared_array_buffers_ = std::move(other.shared_array_buffers_);
message_ports_ = std::move(other.message_ports_);
return *this;
}

Expand All @@ -65,13 +68,25 @@ namespace {
class DeserializerDelegate : public ValueDeserializer::Delegate {
public:
DeserializerDelegate(Message* m,
Environment* env)
: env_(env), msg_(m) {}
Environment* env,
const std::vector<MessagePort*>& message_ports)
: env_(env), msg_(m), message_ports_(message_ports) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
// by the index in the message's MessagePort array is sufficient.
uint32_t id;
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
return message_ports_[id]->object();
};

ValueDeserializer* deserializer = nullptr;
private:
Environment* env_;
Message* msg_;
const std::vector<MessagePort*>& message_ports_;
};

} // anonymous namespace
Expand All @@ -85,13 +100,27 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
// This is for messages generated in C++ with the expectation that they
// are handled in JS, e.g. serialized error messages from workers.
CHECK(array_buffer_contents_.empty());
CHECK(shared_array_buffers_.empty());
CHECK(message_ports_.empty());
char* buf = main_message_buf_.base;
main_message_buf_.base = nullptr;
return handle_scope.Escape(
Buffer::New(env, buf, main_message_buf_.len).FromMaybe(Local<Value>()));
}

DeserializerDelegate delegate(this, env);
// Create all necessary MessagePort handles.
std::vector<MessagePort*> ports(message_ports_.size());
for (uint32_t i = 0; i < message_ports_.size(); ++i) {
ports[i] = MessagePort::New(env,
context,
nullptr,
std::move(message_ports_[i]));
if (ports[i] == nullptr)
return MaybeLocal<Value>();
}
message_ports_.clear();

DeserializerDelegate delegate(this, env, ports);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.base),
Expand All @@ -111,12 +140,30 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
}
array_buffer_contents_.clear();

for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
Local<SharedArrayBuffer> sab;
if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
.ToLocal(&sab))
return MaybeLocal<Value>();
deserializer.TransferSharedArrayBuffer(i, sab);
}
shared_array_buffers_.clear();

if (deserializer.ReadHeader(context).IsNothing())
return MaybeLocal<Value>();
return handle_scope.Escape(
deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}

void Message::AddSharedArrayBuffer(
SharedArrayBufferMetadataReference reference) {
shared_array_buffers_.push_back(reference);
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}

namespace {

class SerializerDelegate : public ValueSerializer::Delegate {
Expand All @@ -128,12 +175,62 @@ class SerializerDelegate : public ValueSerializer::Delegate {
env_->isolate()->ThrowException(Exception::Error(message));
}

Maybe<bool> WriteHostObject(Isolate* isolate, Local<Object> object) override {
if (env_->message_port_constructor_template()->HasInstance(object)) {
return WriteMessagePort(Unwrap<MessagePort>(object));
}

env_->ThrowError("Cannot serialize unknown type of host object");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tests would be nice although not necessary.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I know it’s kind of unfortunate that they are in a separate commit – the thing is, it just makes a lot more sense to test these methods when actually transferring data between workers…

return Nothing<bool>();
}

Maybe<uint32_t> GetSharedArrayBufferId(
Isolate* isolate, Local<SharedArrayBuffer> shared_array_buffer) override {
uint32_t i;
for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
if (seen_shared_array_buffers_[i] == shared_array_buffer)
return Just(i);
}

SharedArrayBufferMetadataReference reference(
SharedArrayBufferMetadata::ForIncomingSharedArrayBuffer(env_,
context_,
shared_array_buffer));
if (!reference) {
return Nothing<uint32_t>();
}
seen_shared_array_buffers_.push_back(shared_array_buffer);
msg_->AddSharedArrayBuffer(reference);
return Just(i);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the returned ID actually used anywhere in Ayo code or is it just for V8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for V8, to match the objects, really… the matching call on the deserializing side is deserializer.TransferSharedArrayBuffer(i, sab);

}

void Finish() {
for (MessagePort* port : ports_) {
port->Close();
msg_->AddMessagePort(port->Detach());
}
}

ValueSerializer* serializer = nullptr;

private:
Maybe<bool> WriteMessagePort(MessagePort* port) {
for (uint32_t i = 0; i < ports_.size(); i++) {
if (ports_[i] == port) {
serializer->WriteUint32(i);
return Just(true);
}
}

env_->ThrowError("MessagePort was not listed in transferList");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test needed. Also TypeError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, at least Firefox throws a special DataCloneError, which is a DOMException and therefore a plain Error

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DOMException isn't really encouraged for new errors any more. TypeErrors and other error types that map to JS are the future for the web. Plus we don't really care about compatibility, do we?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, not really the point :) Still, why TypeError? I don’t really think the semantics here are “this thing has the wrong type”, more like, “you missed an entry in a list, please add it”, which doesn’t seem like a type mismatch to me…

Plus we don't really care about compatibility, do we?

Well … for MessagePorts/MessageChannels we kind of do? 😄 Maybe not in this first iteration but it should be doable to get some degree of compatibility

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still, why TypeError?

TypeError is used to indicate an unsuccessful operation when none of the other NativeError objects are an appropriate indication of the failure cause.

https://tc39.github.io/ecma262/#sec-native-error-types-used-in-this-standard-typeerror

Either way I just realized require('v8').Serializer.prototype._getDataCloneError is also Error, so I guess consistency trumps all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://tc39.github.io/ecma262/#sec-native-error-types-used-in-this-standard-typeerror

I think that’s because the language spec isn’t really concerned with these kinds of errors?

Either way I just realized require('v8').Serializer.prototype._getDataCloneError is also Error, so I guess consistency trumps all.

I mean … that’s because I wrote it that way. 😄 We definitely can change that if you think it makes sense

return Nothing<bool>();
}

Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
std::vector<MessagePort*> ports_;

friend class worker::Message;
};
Expand All @@ -159,7 +256,7 @@ Maybe<bool> Message::Serialize(Environment* env,
Local<Value> entry;
if (!transfer_list->Get(context, i).ToLocal(&entry))
return Nothing<bool>();
// Currently, we support ArrayBuffers.
// Currently, we support ArrayBuffers and MessagePorts.
if (entry->IsArrayBuffer()) {
Local<ArrayBuffer> ab = entry.As<ArrayBuffer>();
if (!ab->IsNeuterable() || ab->IsExternal())
Expand All @@ -168,6 +265,12 @@ Maybe<bool> Message::Serialize(Environment* env,
array_buffers.push_back(ab);
serializer.TransferArrayBuffer(id, ab);
continue;
} else if (env->message_port_constructor_template()
->HasInstance(entry)) {
MessagePort* port = Unwrap<MessagePort>(entry.As<Object>());
CHECK_NE(port, nullptr);
delegate.ports_.push_back(port);
continue;
}

env->ThrowError("Found invalid object in transferList");
Expand All @@ -188,6 +291,7 @@ Maybe<bool> Message::Serialize(Environment* env,
contents.ByteLength()));
}

delegate.Finish();
std::pair<uint8_t*, size_t> data = serializer.Release();
main_message_buf_.base = reinterpret_cast<char*>(data.first);
main_message_buf_.len = data.second;
Expand Down
12 changes: 10 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include "env.h"
#include "node_mutex.h"
#include "sharedarraybuffer-metadata.h"
#include <list>
#include <memory>

namespace node {
namespace worker {
Expand All @@ -27,7 +27,6 @@ class MessagePort;

// Any further flagged message codes are defined by the modules that use them.


// Represents a single communication message. The only non-standard extension
// here is passing of a separate flag that the Workers implementation uses
// for internal cross-thread information passing.
Expand All @@ -54,10 +53,19 @@ class Message {
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);

// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
void AddSharedArrayBuffer(SharedArrayBufferMetadataReference ref);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);

private:
int32_t flag_ = MESSAGE_FLAG_NONE;
uv_buf_t main_message_buf_;
std::vector<uv_buf_t> array_buffer_contents_;
std::vector<SharedArrayBufferMetadataReference> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;

friend class MessagePort;
};
Expand Down
Loading