Skip to content

Commit

Permalink
worker: add SharedArrayBuffer sharing
Browse files Browse the repository at this point in the history
Logic is added to the `MessagePort` mechanism that
attaches hidden objects to those instances when they are transferred
that track their lifetime and maintain a reference count, to make
sure that memory is freed at the appropriate times.

Thanks to Stephen Belanger for reviewing this change in its original PR.

Refs: ayojs/ayo#106

PR-URL: #20876
Reviewed-By: Gireesh Punathil <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Shingo Inoue <[email protected]>
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Tiancheng "Timothy" Gu <[email protected]>
Reviewed-By: John-David Dalton <[email protected]>
Reviewed-By: Gus Caplan <[email protected]>
  • Loading branch information
addaleax authored and targos committed Jun 7, 2018
1 parent ef2d09d commit 8149d46
Show file tree
Hide file tree
Showing 8 changed files with 274 additions and 9 deletions.
15 changes: 11 additions & 4 deletions doc/api/worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ to stringify.

`transferList` may be a list of `ArrayBuffer` and `MessagePort` objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`).
anymore (even if they are not contained in `value`). Unlike with
[child processes][], transferring handles such as network sockets is currently
not supported.

If `value` contains [`SharedArrayBuffer`][] instances, those will be accessible
from either thread. They cannot be listed in `transferList`.

`value` may still contain `ArrayBuffer` instances that are not in
`transferList`; in that case, the underlying memory is copied rather than moved.

For more information on the serialization and deserialization mechanisms
behind this API, see the [serialization API of the `v8` module][v8.serdes].

Because the object cloning uses the structured clone algorithm,
non-enumerable properties, property accessors, and object prototypes are
not preserved. In particular, [`Buffer`][] objects will be read as
Expand All @@ -101,6 +103,9 @@ plain [`Uint8Array`][]s on the receiving side.
The message object will be cloned immediately, and can be modified after
posting without having side effects.

For more information on the serialization and deserialization mechanisms
behind this API, see the [serialization API of the `v8` module][v8.serdes].

### port.ref()
<!-- YAML
added: REPLACEME
Expand Down Expand Up @@ -137,10 +142,12 @@ be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.

[`Buffer`]: buffer.html
[child processes]: child_process.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
[v8.serdes]: v8.html#v8_serialization_api
[`SharedArrayBuffer`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/SharedArrayBuffer
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
2 changes: 2 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@
'src/node_i18n.cc',
'src/pipe_wrap.cc',
'src/process_wrap.cc',
'src/sharedarraybuffer_metadata.cc',
'src/signal_wrap.cc',
'src/spawn_sync.cc',
'src/string_bytes.cc',
Expand Down Expand Up @@ -411,6 +412,7 @@
'src/udp_wrap.h',
'src/req_wrap.h',
'src/req_wrap-inl.h',
'src/sharedarraybuffer_metadata.h',
'src/string_bytes.h',
'src/string_decoder.h',
'src/string_decoder-inl.h',
Expand Down
2 changes: 2 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ struct PackageConfig {
V(decorated_private_symbol, "node:decorated") \
V(napi_env, "node:napi:env") \
V(napi_wrapper, "node:napi:wrapper") \
V(sab_lifetimepartner_symbol, "node:sharedArrayBufferLifetimePartner") \

// Symbols are per-isolate primitives but Environment proxies them
// for the sake of convenience.
Expand Down Expand Up @@ -338,6 +339,7 @@ struct PackageConfig {
V(promise_wrap_template, v8::ObjectTemplate) \
V(push_values_to_array_function, v8::Function) \
V(randombytes_constructor_template, v8::ObjectTemplate) \
V(sab_lifetimepartner_constructor_template, v8::FunctionTemplate) \
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
Expand Down
5 changes: 4 additions & 1 deletion src/node_errors.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ namespace node {
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \
V(ERR_STRING_TOO_LONG, Error) \
V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, TypeError) \

#define V(code, type) \
inline v8::Local<v8::Value> code(v8::Isolate* isolate, \
Expand Down Expand Up @@ -60,7 +61,9 @@ namespace node {
V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList")
"MessagePort was found in message but not listed in transferList") \
V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \
"Cannot serialize externalized SharedArrayBuffer") \

#define V(code, message) \
inline v8::Local<v8::Value> code(v8::Isolate* isolate) { \
Expand Down
57 changes: 54 additions & 3 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::String;
using v8::Value;
using v8::ValueDeserializer;
Expand All @@ -43,8 +44,13 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
public:
DeserializerDelegate(Message* m,
Environment* env,
const std::vector<MessagePort*>& message_ports)
: env_(env), msg_(m), message_ports_(message_ports) {}
const std::vector<MessagePort*>& message_ports,
const std::vector<Local<SharedArrayBuffer>>&
shared_array_buffers)
: env_(env),
msg_(m),
message_ports_(message_ports),
shared_array_buffers_(shared_array_buffers) {}

MaybeLocal<Object> ReadHostObject(Isolate* isolate) override {
// Currently, only MessagePort hosts objects are supported, so identifying
Expand All @@ -56,12 +62,19 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
return message_ports_[id]->object();
};

MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
Isolate* isolate, uint32_t clone_id) override {
CHECK_LE(clone_id, shared_array_buffers_.size());
return shared_array_buffers_[clone_id];
}

ValueDeserializer* deserializer = nullptr;

private:
Environment* env_;
Message* msg_;
const std::vector<MessagePort*>& message_ports_;
const std::vector<Local<SharedArrayBuffer>>& shared_array_buffers_;
};

} // anonymous namespace
Expand All @@ -87,7 +100,18 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
}
message_ports_.clear();

DeserializerDelegate delegate(this, env, ports);
std::vector<Local<SharedArrayBuffer>> shared_array_buffers;
// Attach all transfered SharedArrayBuffers to their new Isolate.
for (uint32_t i = 0; i < shared_array_buffers_.size(); ++i) {
Local<SharedArrayBuffer> sab;
if (!shared_array_buffers_[i]->GetSharedArrayBuffer(env, context)
.ToLocal(&sab))
return MaybeLocal<Value>();
shared_array_buffers.push_back(sab);
}
shared_array_buffers_.clear();

DeserializerDelegate delegate(this, env, ports, shared_array_buffers);
ValueDeserializer deserializer(
env->isolate(),
reinterpret_cast<const uint8_t*>(main_message_buf_.data),
Expand All @@ -112,6 +136,11 @@ MaybeLocal<Value> Message::Deserialize(Environment* env,
deserializer.ReadValue(context).FromMaybe(Local<Value>()));
}

void Message::AddSharedArrayBuffer(
SharedArrayBufferMetadataReference reference) {
shared_array_buffers_.push_back(reference);
}

void Message::AddMessagePort(std::unique_ptr<MessagePortData>&& data) {
message_ports_.emplace_back(std::move(data));
}
Expand Down Expand Up @@ -139,6 +168,27 @@ class SerializerDelegate : public ValueSerializer::Delegate {
return Nothing<bool>();
}

Maybe<uint32_t> GetSharedArrayBufferId(
Isolate* isolate,
Local<SharedArrayBuffer> shared_array_buffer) override {
uint32_t i;
for (i = 0; i < seen_shared_array_buffers_.size(); ++i) {
if (seen_shared_array_buffers_[i] == shared_array_buffer)
return Just(i);
}

auto reference = SharedArrayBufferMetadata::ForSharedArrayBuffer(
env_,
context_,
shared_array_buffer);
if (!reference) {
return Nothing<uint32_t>();
}
seen_shared_array_buffers_.push_back(shared_array_buffer);
msg_->AddSharedArrayBuffer(reference);
return Just(i);
}

void Finish() {
// Only close the MessagePort handles and actually transfer them
// once we know that serialization succeeded.
Expand Down Expand Up @@ -166,6 +216,7 @@ class SerializerDelegate : public ValueSerializer::Delegate {
Environment* env_;
Local<Context> context_;
Message* msg_;
std::vector<Local<SharedArrayBuffer>> seen_shared_array_buffers_;
std::vector<MessagePort*> ports_;

friend class worker::Message;
Expand Down
6 changes: 5 additions & 1 deletion src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@

#include "env.h"
#include "node_mutex.h"
#include "sharedarraybuffer_metadata.h"
#include <list>
#include <memory>

namespace node {
namespace worker {
Expand Down Expand Up @@ -37,13 +37,17 @@ class Message {
v8::Local<v8::Value> input,
v8::Local<v8::Value> transfer_list);

// Internal method of Message that is called when a new SharedArrayBuffer
// object is encountered in the incoming value's structure.
void AddSharedArrayBuffer(SharedArrayBufferMetadataReference ref);
// Internal method of Message that is called once serialization finishes
// and that transfers ownership of `data` to this message.
void AddMessagePort(std::unique_ptr<MessagePortData>&& data);

private:
MallocedBuffer<char> main_message_buf_;
std::vector<MallocedBuffer<char>> array_buffer_contents_;
std::vector<SharedArrayBufferMetadataReference> shared_array_buffers_;
std::vector<std::unique_ptr<MessagePortData>> message_ports_;

friend class MessagePort;
Expand Down
129 changes: 129 additions & 0 deletions src/sharedarraybuffer_metadata.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
#include "sharedarraybuffer_metadata.h"
#include "base_object.h"
#include "base_object-inl.h"
#include "node_errors.h"

using v8::Context;
using v8::Function;
using v8::FunctionTemplate;
using v8::Local;
using v8::Maybe;
using v8::MaybeLocal;
using v8::Nothing;
using v8::Object;
using v8::SharedArrayBuffer;
using v8::Value;

namespace node {
namespace worker {

namespace {

// Yield a JS constructor for SABLifetimePartner objects in the form of a
// standard API object, that has a single field for containing the raw
// SABLiftimePartner* pointer.
Local<Function> GetSABLifetimePartnerConstructor(
Environment* env, Local<Context> context) {
Local<FunctionTemplate> templ;
templ = env->sab_lifetimepartner_constructor_template();
if (!templ.IsEmpty())
return templ->GetFunction(context).ToLocalChecked();

templ = BaseObject::MakeLazilyInitializedJSTemplate(env);
templ->SetClassName(FIXED_ONE_BYTE_STRING(env->isolate(),
"SABLifetimePartner"));
env->set_sab_lifetimepartner_constructor_template(templ);

return GetSABLifetimePartnerConstructor(env, context);
}

class SABLifetimePartner : public BaseObject {
public:
SABLifetimePartner(Environment* env,
Local<Object> obj,
SharedArrayBufferMetadataReference r)
: BaseObject(env, obj),
reference(r) {
MakeWeak();
}

SharedArrayBufferMetadataReference reference;
};

} // anonymous namespace

SharedArrayBufferMetadataReference
SharedArrayBufferMetadata::ForSharedArrayBuffer(
Environment* env,
Local<Context> context,
Local<SharedArrayBuffer> source) {
Local<Value> lifetime_partner;

if (!source->GetPrivate(context,
env->sab_lifetimepartner_symbol())
.ToLocal(&lifetime_partner)) {
return nullptr;
}

if (lifetime_partner->IsObject() &&
env->sab_lifetimepartner_constructor_template()
->HasInstance(lifetime_partner)) {
CHECK(source->IsExternal());
SABLifetimePartner* partner =
Unwrap<SABLifetimePartner>(lifetime_partner.As<Object>());
CHECK_NE(partner, nullptr);
return partner->reference;
}

if (source->IsExternal()) {
// If this is an external SharedArrayBuffer but we do not see a lifetime
// partner object, it was not us who externalized it. In that case, there
// is no way to serialize it, because it's unclear how the memory
// is actually owned.
THROW_ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER(env);
return nullptr;
}

SharedArrayBuffer::Contents contents = source->Externalize();
SharedArrayBufferMetadataReference r(new SharedArrayBufferMetadata(
contents.Data(), contents.ByteLength()));
if (r->AssignToSharedArrayBuffer(env, context, source).IsNothing())
return nullptr;
return r;
}

Maybe<bool> SharedArrayBufferMetadata::AssignToSharedArrayBuffer(
Environment* env, Local<Context> context,
Local<SharedArrayBuffer> target) {
CHECK(target->IsExternal());
Local<Function> ctor = GetSABLifetimePartnerConstructor(env, context);
Local<Object> obj;
if (!ctor->NewInstance(context).ToLocal(&obj))
return Nothing<bool>();

new SABLifetimePartner(env, obj, shared_from_this());
return target->SetPrivate(context,
env->sab_lifetimepartner_symbol(),
obj);
}

SharedArrayBufferMetadata::SharedArrayBufferMetadata(void* data, size_t size)
: data(data), size(size) { }

SharedArrayBufferMetadata::~SharedArrayBufferMetadata() {
free(data);
}

MaybeLocal<SharedArrayBuffer> SharedArrayBufferMetadata::GetSharedArrayBuffer(
Environment* env, Local<Context> context) {
Local<SharedArrayBuffer> obj =
SharedArrayBuffer::New(env->isolate(), data, size);

if (AssignToSharedArrayBuffer(env, context, obj).IsNothing())
return MaybeLocal<SharedArrayBuffer>();

return obj;
}

} // namespace worker
} // namespace node
Loading

0 comments on commit 8149d46

Please sign in to comment.