From d55ddfce3be534964817d766ea59588835d3b386 Mon Sep 17 00:00:00 2001 From: Keyhan Vakil Date: Tue, 26 Apr 2022 03:34:40 +0000 Subject: [PATCH 1/5] worker: fix stream racing with terminate `OnStreamAfterReqFinished` uses `v8::Object::Has` to check if it needs to call `oncomplete`. `v8::Object::Has` needs to execute Javascript. However when worker threads are involved, `OnStreamAfterReqFinished` may be called after the worker thread termination has begun via `worker.terminate()`. This makes `v8::Object::Has` return `Nothing`, which triggers an assert. This diff fixes the issue by simply defaulting us to `false` in the case where `Nothing` is returned. This is sound because we can't execute `oncomplete` anyway as the isolate is terminating. Fixes: https://github.com/nodejs/node/issues/38418 --- src/stream_base.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream_base.cc b/src/stream_base.cc index a47fad970355dc..16a8827433da37 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -619,7 +619,7 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( stream->ClearError(); } - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) + if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromMaybe(false)) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); } From cf200b528142797d9e8c70bd510fdb40743ae50f Mon Sep 17 00:00:00 2001 From: Keyhan Vakil Date: Thu, 28 Apr 2022 07:00:56 +0000 Subject: [PATCH 2/5] add test --- .../test-worker-http2-stream-terminate.js | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) create mode 100644 test/parallel/test-worker-http2-stream-terminate.js diff --git a/test/parallel/test-worker-http2-stream-terminate.js b/test/parallel/test-worker-http2-stream-terminate.js new file mode 100644 index 00000000000000..f40be3515847b2 --- /dev/null +++ b/test/parallel/test-worker-http2-stream-terminate.js @@ -0,0 +1,55 @@ +'use strict'; +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const http2 = require('http2'); +const makeDuplexPair = require('../common/duplexpair'); +const { Worker, isMainThread, parentPort } = require('worker_threads'); + +// This test ensures that workers can be terminated without error while +// stream activity is ongoing, in particular the C++ function +// ReportWritesToJSStreamListener::OnStreamAfterReqFinished. + +if (isMainThread) { + const sab = new SharedArrayBuffer(4); + const terminate = new Int32Array(sab); + + const w = new Worker(__filename); + w.postMessage(sab); + process.nextTick(() => { + Atomics.wait(terminate, 0, 0); + setImmediate(() => w.terminate()); + }); + return; +} + +parentPort.on('message', (sab) => { + const terminate = new Int32Array(sab); + const server = http2.createServer(); + let i = 0; + server.on('stream', (stream, headers) => { + if (i === 1) { + Atomics.store(terminate, 0, 1); + Atomics.notify(terminate, 0, 1); + } + i++; + + stream.end(''); + }); + + const { clientSide, serverSide } = makeDuplexPair(); + server.emit('connection', serverSide); + + const client = http2.connect('http://localhost:80', { + createConnection: () => clientSide, + }); + + function makeReq() { + for (let i = 0; i < 3; i++) { + client.request().end(); + } + setImmediate(makeReq); + } + makeReq(); + +}); From c57dd41f9980b888ae60f404915ea5a2a1a7ec16 Mon Sep 17 00:00:00 2001 From: Keyhan Vakil Date: Thu, 28 Apr 2022 07:05:12 +0000 Subject: [PATCH 3/5] lint --- src/stream_base.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stream_base.cc b/src/stream_base.cc index 16a8827433da37..96fd3b030e5e33 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -619,7 +619,8 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( stream->ClearError(); } - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromMaybe(false)) + if (req_wrap_obj->Has(env->context(), env->oncomplete_string()) + .FromMaybe(false)) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); } From b17b64f816c70d49cc7c063345a617c1622d0b90 Mon Sep 17 00:00:00 2001 From: Keyhan Vakil Date: Tue, 3 May 2022 16:56:13 +0000 Subject: [PATCH 4/5] follow RaisinTen's suggestions --- src/stream_base.cc | 4 +- .../test-worker-http2-stream-terminate.js | 45 ++++++++++--------- 2 files changed, 27 insertions(+), 22 deletions(-) diff --git a/src/stream_base.cc b/src/stream_base.cc index 96fd3b030e5e33..2b3fbe38ff2872 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -601,6 +601,7 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( StreamReq* req_wrap, int status) { StreamBase* stream = static_cast(stream_); Environment* env = stream->stream_env(); + if (env->is_stopping()) return; AsyncWrap* async_wrap = req_wrap->GetAsyncWrap(); HandleScope handle_scope(env->isolate()); Context::Scope context_scope(env->context()); @@ -619,8 +620,7 @@ void ReportWritesToJSStreamListener::OnStreamAfterReqFinished( stream->ClearError(); } - if (req_wrap_obj->Has(env->context(), env->oncomplete_string()) - .FromMaybe(false)) + if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) async_wrap->MakeCallback(env->oncomplete_string(), arraysize(argv), argv); } diff --git a/test/parallel/test-worker-http2-stream-terminate.js b/test/parallel/test-worker-http2-stream-terminate.js index f40be3515847b2..1599ffe4b856ad 100644 --- a/test/parallel/test-worker-http2-stream-terminate.js +++ b/test/parallel/test-worker-http2-stream-terminate.js @@ -2,6 +2,7 @@ const common = require('../common'); if (!common.hasCrypto) common.skip('missing crypto'); +const assert = require('assert'); const http2 = require('http2'); const makeDuplexPair = require('../common/duplexpair'); const { Worker, isMainThread, parentPort } = require('worker_threads'); @@ -10,27 +11,32 @@ const { Worker, isMainThread, parentPort } = require('worker_threads'); // stream activity is ongoing, in particular the C++ function // ReportWritesToJSStreamListener::OnStreamAfterReqFinished. +const MAX_ITERATIONS = 20; +const MAX_THREADS = 10; + if (isMainThread) { - const sab = new SharedArrayBuffer(4); - const terminate = new Int32Array(sab); - - const w = new Worker(__filename); - w.postMessage(sab); - process.nextTick(() => { - Atomics.wait(terminate, 0, 0); - setImmediate(() => w.terminate()); - }); - return; -} + function spinWorker(iter) { + const w = new Worker(__filename); + w.on('message', common.mustCall((msg) => { + assert.strictEqual(msg, 'terminate'); + w.terminate(); + })); + + w.on('exit', common.mustCall(() => { + if (iter < MAX_ITERATIONS) + spinWorker(++iter); + })); + } -parentPort.on('message', (sab) => { - const terminate = new Int32Array(sab); + for (let i = 0; i < MAX_THREADS; i++) { + spinWorker(0); + } +} else { const server = http2.createServer(); let i = 0; server.on('stream', (stream, headers) => { if (i === 1) { - Atomics.store(terminate, 0, 1); - Atomics.notify(terminate, 0, 1); + parentPort.postMessage('terminate'); } i++; @@ -44,12 +50,11 @@ parentPort.on('message', (sab) => { createConnection: () => clientSide, }); - function makeReq() { + function makeRequests() { for (let i = 0; i < 3; i++) { client.request().end(); } - setImmediate(makeReq); + setImmediate(makeRequests); } - makeReq(); - -}); + makeRequests(); +} From a7bdb8e036960a85aa4f1a64a472db06ff80128c Mon Sep 17 00:00:00 2001 From: Keyhan Vakil Date: Thu, 5 May 2022 23:21:28 +0000 Subject: [PATCH 5/5] fix node-test-commit-custom-suites-freestyle failure --- test/parallel/test-worker-http2-stream-terminate.js | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/test/parallel/test-worker-http2-stream-terminate.js b/test/parallel/test-worker-http2-stream-terminate.js index 1599ffe4b856ad..94e60e773c4b3e 100644 --- a/test/parallel/test-worker-http2-stream-terminate.js +++ b/test/parallel/test-worker-http2-stream-terminate.js @@ -5,7 +5,7 @@ if (!common.hasCrypto) const assert = require('assert'); const http2 = require('http2'); const makeDuplexPair = require('../common/duplexpair'); -const { Worker, isMainThread, parentPort } = require('worker_threads'); +const { Worker, parentPort } = require('worker_threads'); // This test ensures that workers can be terminated without error while // stream activity is ongoing, in particular the C++ function @@ -14,7 +14,10 @@ const { Worker, isMainThread, parentPort } = require('worker_threads'); const MAX_ITERATIONS = 20; const MAX_THREADS = 10; -if (isMainThread) { +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + function spinWorker(iter) { const w = new Worker(__filename); w.on('message', common.mustCall((msg) => {