diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 74f75071429c1e..03c6d5aaa76ebd 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -515,19 +515,8 @@ void Message::MemoryInfo(MemoryTracker* tracker) const { tracker->TrackField("transferables", transferables_); } -// TODO(@jasnell): The name here will be an empty string if the -// one-to-one MessageChannel is used. In such cases, -// SiblingGroup::Get() will return nothing and group_ will be -// an empty pointer. @addaleax suggests that the code here -// could be clearer if attaching the SiblingGroup were a -// separate step rather than part of the constructor here. -MessagePortData::MessagePortData( - MessagePort* owner, - const std::string& name) - : owner_(owner), - group_(SiblingGroup::Get(name)) { - if (group_) - group_->Entangle(this); +MessagePortData::MessagePortData(MessagePort* owner) + : owner_(owner) { } MessagePortData::~MessagePortData() { @@ -552,17 +541,13 @@ void MessagePortData::AddToIncomingQueue(std::shared_ptr message) { } void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { - CHECK(!a->group_); - CHECK(!b->group_); - b->group_ = a->group_ = std::make_shared(); - a->group_->Entangle(a); - a->group_->Entangle(b); + auto group = std::make_shared(); + group->Entangle({a, b}); } void MessagePortData::Disentangle() { if (group_) { group_->Disentangle(this); - group_.reset(); } } @@ -572,13 +557,12 @@ MessagePort::~MessagePort() { MessagePort::MessagePort(Environment* env, Local context, - Local wrap, - const std::string& name) + Local wrap) : HandleWrap(env, wrap, reinterpret_cast(&async_), AsyncWrap::PROVIDER_MESSAGEPORT), - data_(new MessagePortData(this, name)) { + data_(new MessagePortData(this)) { auto onmessage = [](uv_async_t* handle) { // Called when data has been put into the queue. MessagePort* channel = ContainerOf(&MessagePort::async_, handle); @@ -645,7 +629,7 @@ MessagePort* MessagePort::New( Environment* env, Local context, std::unique_ptr data, - const std::string& name) { + std::shared_ptr sibling_group) { Context::Scope context_scope(context); Local ctor_templ = GetMessagePortConstructorTemplate(env); @@ -654,7 +638,7 @@ MessagePort* MessagePort::New( Local instance; if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance)) return nullptr; - MessagePort* port = new MessagePort(env, context, instance, name); + MessagePort* port = new MessagePort(env, context, instance); CHECK_NOT_NULL(port); if (port->IsHandleClosing()) { // Construction failed with an exception. @@ -662,6 +646,7 @@ MessagePort* MessagePort::New( } if (data) { + CHECK(!sibling_group); port->Detach(); port->data_ = std::move(data); @@ -673,6 +658,8 @@ MessagePort* MessagePort::New( // If the existing MessagePortData object had pending messages, this is // the easiest way to run that queue. port->TriggerAsync(); + } else if (sibling_group) { + sibling_group->Entangle(port->data_.get()); } return port; } @@ -1067,7 +1054,7 @@ void MessagePort::MoveToContext(const FunctionCallbackInfo& args) { } void MessagePort::Entangle(MessagePort* a, MessagePort* b) { - Entangle(a, b->data_.get()); + MessagePortData::Entangle(a->data_.get(), b->data_.get()); } void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { @@ -1274,7 +1261,6 @@ Maybe JSTransferable::Data::FinalizeTransferWrite( } std::shared_ptr SiblingGroup::Get(const std::string& name) { - if (name.empty()) return {}; Mutex::ScopedLock lock(SiblingGroup::groups_mutex_); std::shared_ptr group; auto i = groups_.find(name); @@ -1348,14 +1334,24 @@ Maybe SiblingGroup::Dispatch( return Just(true); } -void SiblingGroup::Entangle(MessagePortData* data) { +void SiblingGroup::Entangle(MessagePortData* port) { + Entangle({ port }); +} + +void SiblingGroup::Entangle(std::initializer_list ports) { Mutex::ScopedLock lock(group_mutex_); - ports_.insert(data); + for (MessagePortData* data : ports) { + ports_.insert(data); + CHECK(!data->group_); + data->group_ = shared_from_this(); + } } void SiblingGroup::Disentangle(MessagePortData* data) { + auto self = shared_from_this(); // Keep alive until end of function. Mutex::ScopedLock lock(group_mutex_); ports_.erase(data); + data->group_.reset(); data->AddToIncomingQueue(std::make_shared()); // If this is an anonymous group and there's another port, close it. @@ -1407,8 +1403,10 @@ static void BroadcastChannel(const FunctionCallbackInfo& args) { Context::Scope context_scope(env->context()); Utf8Value name(env->isolate(), args[0]); MessagePort* port = - MessagePort::New(env, env->context(), nullptr, std::string(*name)); - args.GetReturnValue().Set(port->object()); + MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name)); + if (port != nullptr) { + args.GetReturnValue().Set(port->object()); + } } static void InitMessaging(Local target, diff --git a/src/node_messaging.h b/src/node_messaging.h index ae8f3be6c39a38..bb30c78d865f01 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -110,7 +110,7 @@ class Message : public MemoryRetainer { friend class MessagePort; }; -class SiblingGroup { +class SiblingGroup final : public std::enable_shared_from_this { public: // Named SiblingGroup, Used for one-to-many BroadcastChannels. static std::shared_ptr Get(const std::string& name); @@ -134,7 +134,7 @@ class SiblingGroup { std::string* error = nullptr); void Entangle(MessagePortData* data); - + void Entangle(std::initializer_list data); void Disentangle(MessagePortData* data); const std::string& name() const { return name_; } @@ -159,9 +159,7 @@ class SiblingGroup { // a specific Environment/Isolate/event loop, for easier transfer between those. class MessagePortData : public TransferData { public: - explicit MessagePortData( - MessagePort* owner, - const std::string& name = std::string()); + explicit MessagePortData(MessagePort* owner); ~MessagePortData() override; MessagePortData(MessagePortData&& other) = delete; @@ -203,6 +201,7 @@ class MessagePortData : public TransferData { MessagePort* owner_ = nullptr; std::shared_ptr group_; friend class MessagePort; + friend class SiblingGroup; }; // A message port that receives messages from other threads, including @@ -216,8 +215,7 @@ class MessagePort : public HandleWrap { // creating MessagePort instances. MessagePort(Environment* env, v8::Local context, - v8::Local wrap, - const std::string& name = std::string()); + v8::Local wrap); public: ~MessagePort() override; @@ -226,8 +224,8 @@ class MessagePort : public HandleWrap { // `MessagePortData` object. static MessagePort* New(Environment* env, v8::Local context, - std::unique_ptr data = nullptr, - const std::string& name = std::string()); + std::unique_ptr data = {}, + std::shared_ptr sibling_group = {}); // Send a message, i.e. deliver it into the sibling's incoming queue. // If this port is closed, or if there is no sibling, this message is