From 470856f078dc4ff5c39ed6fa31251235d190bc80 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Tue, 12 Mar 2019 00:42:31 +0800 Subject: [PATCH 1/3] worker: create per-Environment message port after bootstrap --- lib/internal/bootstrap/node.js | 2 +- lib/internal/process/worker_thread_only.js | 29 ++++++++------------ lib/internal/worker/io.js | 18 ++++++++++-- src/node_worker.cc | 32 +++++++++++----------- src/node_worker.h | 2 +- 5 files changed, 45 insertions(+), 38 deletions(-) diff --git a/lib/internal/bootstrap/node.js b/lib/internal/bootstrap/node.js index f8cddb3153ed20..49f72d6a0f0094 100644 --- a/lib/internal/bootstrap/node.js +++ b/lib/internal/bootstrap/node.js @@ -165,7 +165,7 @@ if (isMainThread) { setupProcessStdio(getStdout, getStdin, getStderr); } else { const { getStdout, getStdin, getStderr } = - workerThreadSetup.initializeWorkerStdio(); + workerThreadSetup.createStdioGetters(); setupProcessStdio(getStdout, getStdin, getStderr); } diff --git a/lib/internal/process/worker_thread_only.js b/lib/internal/process/worker_thread_only.js index 2cc52cbf01b8cd..b6529e43441679 100644 --- a/lib/internal/process/worker_thread_only.js +++ b/lib/internal/process/worker_thread_only.js @@ -2,32 +2,25 @@ // This file contains process bootstrappers that can only be // run in the worker thread. -const { - getEnvMessagePort -} = internalBinding('worker'); const { - kWaitingStreams, - ReadableWorkerStdio, - WritableWorkerStdio + createWorkerStdio } = require('internal/worker/io'); const { codes: { ERR_WORKER_UNSUPPORTED_OPERATION } } = require('internal/errors'); -const workerStdio = {}; - -function initializeWorkerStdio() { - const port = getEnvMessagePort(); - port[kWaitingStreams] = 0; - workerStdio.stdin = new ReadableWorkerStdio(port, 'stdin'); - workerStdio.stdout = new WritableWorkerStdio(port, 'stdout'); - workerStdio.stderr = new WritableWorkerStdio(port, 'stderr'); +let workerStdio; +function lazyWorkerStdio() { + if (!workerStdio) workerStdio = createWorkerStdio(); + return workerStdio; +} +function createStdioGetters() { return { - getStdout() { return workerStdio.stdout; }, - getStderr() { return workerStdio.stderr; }, - getStdin() { return workerStdio.stdin; } + getStdout() { return lazyWorkerStdio().stdout; }, + getStderr() { return lazyWorkerStdio().stderr; }, + getStdin() { return lazyWorkerStdio().stdin; } }; } @@ -55,7 +48,7 @@ function unavailable(name) { } module.exports = { - initializeWorkerStdio, + createStdioGetters, unavailable, wrapProcessMethods }; diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 2f1352fdf910b8..664055b5c5b9af 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -11,7 +11,10 @@ const { moveMessagePortToContext, stopMessagePort } = internalBinding('messaging'); -const { threadId } = internalBinding('worker'); +const { + threadId, + getEnvMessagePort +} = internalBinding('worker'); const { Readable, Writable } = require('stream'); const EventEmitter = require('events'); @@ -227,6 +230,16 @@ class WritableWorkerStdio extends Writable { } } +function createWorkerStdio() { + const port = getEnvMessagePort(); + port[kWaitingStreams] = 0; + return { + stdin: new ReadableWorkerStdio(port, 'stdin'), + stdout: new WritableWorkerStdio(port, 'stdout'), + stderr: new WritableWorkerStdio(port, 'stderr') + }; +} + module.exports = { drainMessagePort, messageTypes, @@ -239,5 +252,6 @@ module.exports = { MessageChannel, setupPortReferencing, ReadableWorkerStdio, - WritableWorkerStdio + WritableWorkerStdio, + createWorkerStdio }; diff --git a/src/node_worker.cc b/src/node_worker.cc index d9f7311c5d9ab1..b7ccbaffa7686f 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -269,22 +269,6 @@ void Worker::Run() { Debug(this, "Created Environment for worker with id %llu", thread_id_); if (is_stopped()) return; { - HandleScope handle_scope(isolate_); - Mutex::ScopedLock lock(mutex_); - // Set up the message channel for receiving messages in the child. - child_port_ = MessagePort::New(env_.get(), - env_->context(), - std::move(child_port_data_)); - // MessagePort::New() may return nullptr if execution is terminated - // within it. - if (child_port_ != nullptr) - env_->set_message_port(child_port_->object(isolate_)); - - Debug(this, "Created message port for worker %llu", thread_id_); - } - - if (is_stopped()) return; - { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), std::move(inspector_parent_handle_), @@ -296,6 +280,9 @@ void Worker::Run() { Environment::AsyncCallbackScope callback_scope(env_.get()); env_->async_hooks()->push_async_ids(1, 0); if (!RunBootstrapping(env_.get()).IsEmpty()) { + CreateEnvMessagePort(env_.get()); + if (is_stopped()) return; + Debug(this, "Created message port for worker %llu", thread_id_); USE(StartExecution(env_.get(), "internal/main/worker_thread")); } @@ -348,6 +335,19 @@ void Worker::Run() { Debug(this, "Worker %llu thread stops", thread_id_); } +void Worker::CreateEnvMessagePort(Environment* env) { + HandleScope handle_scope(isolate_); + Mutex::ScopedLock lock(mutex_); + // Set up the message channel for receiving messages in the child. + child_port_ = MessagePort::New(env, + env->context(), + std::move(child_port_data_)); + // MessagePort::New() may return nullptr if execution is terminated + // within it. + if (child_port_ != nullptr) + env->set_message_port(child_port_->object(isolate_)); +} + void Worker::JoinThread() { if (thread_joined_) return; diff --git a/src/node_worker.h b/src/node_worker.h index adc755426d03c8..9510faafe641a9 100644 --- a/src/node_worker.h +++ b/src/node_worker.h @@ -50,7 +50,7 @@ class Worker : public AsyncWrap { private: void OnThreadStopped(); - + void CreateEnvMessagePort(Environment* env); const std::string url_; std::shared_ptr per_isolate_opts_; From 6804c0bc0a4c7a35b5a0c3f21fb62d1734d5a84d Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Mon, 11 Mar 2019 23:52:47 +0800 Subject: [PATCH 2/3] process: check no handle or request is active after bootstrap --- src/node.cc | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/node.cc b/src/node.cc index 783962d192f5f0..779ad624e195f2 100644 --- a/src/node.cc +++ b/src/node.cc @@ -358,6 +358,17 @@ MaybeLocal RunBootstrapping(Environment* env) { .IsNothing()) return MaybeLocal(); + // Make sure that no request or handle is created during bootstrap - + // if necessary those should be done in pre-exeuction. + // TODO(joyeecheung): print requests before aborting + if (env->event_loop()->active_handles > 0) { + PrintLibuvHandleInformation(env->event_loop(), stderr); + CHECK_EQ(env->event_loop()->active_handles, 0); + } + CHECK(env->req_wrap_queue()->IsEmpty()); + CHECK(env->handle_wrap_queue()->IsEmpty()); + CHECK_EQ(env->event_loop()->active_reqs.count, 0); + env->set_has_run_bootstrapping_code(true); return scope.EscapeMaybe(result); From faf6e3bb77fd0e50fef7117b8c2dcb5a0df1fc65 Mon Sep 17 00:00:00 2001 From: Joyee Cheung Date: Tue, 19 Mar 2019 06:33:21 +0800 Subject: [PATCH 3/3] fixup! process: check no handle or request is active after bootstrap --- src/node.cc | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/src/node.cc b/src/node.cc index 779ad624e195f2..99523f9e27f873 100644 --- a/src/node.cc +++ b/src/node.cc @@ -360,14 +360,9 @@ MaybeLocal RunBootstrapping(Environment* env) { // Make sure that no request or handle is created during bootstrap - // if necessary those should be done in pre-exeuction. - // TODO(joyeecheung): print requests before aborting - if (env->event_loop()->active_handles > 0) { - PrintLibuvHandleInformation(env->event_loop(), stderr); - CHECK_EQ(env->event_loop()->active_handles, 0); - } + // TODO(joyeecheung): print handles/requests before aborting CHECK(env->req_wrap_queue()->IsEmpty()); CHECK(env->handle_wrap_queue()->IsEmpty()); - CHECK_EQ(env->event_loop()->active_reqs.count, 0); env->set_has_run_bootstrapping_code(true);