From 2e71842f633b4138baddc2982079f4590e991fa6 Mon Sep 17 00:00:00 2001 From: Kevin Eady <8634912+KevinEady@users.noreply.github.com> Date: Fri, 1 Nov 2019 21:34:58 +0100 Subject: [PATCH] tsfn: Implement copy constructor * tsfn: Implement copy constructor Refs: https://github.com/nodejs/node-addon-api/issues/524 PR-URL: https://github.com/nodejs/node-addon-api/pull/546 Reviewed-By: Michael Dawson Reviewed-By: Chengzhong Wu --- napi-inl.h | 39 ++-- napi.h | 10 +- test/binding.cc | 2 + test/binding.gyp | 1 + test/index.js | 2 + .../threadsafe_function_sum.cc | 199 ++++++++++++++++++ .../threadsafe_function_sum.js | 65 ++++++ 7 files changed, 284 insertions(+), 34 deletions(-) create mode 100644 test/threadsafe_function/threadsafe_function_sum.cc create mode 100644 test/threadsafe_function/threadsafe_function_sum.js diff --git a/napi-inl.h b/napi-inl.h index 0abfcbac4..05a7ef97c 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -4025,29 +4025,16 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, } inline ThreadSafeFunction::ThreadSafeFunction() - : _tsfn(new napi_threadsafe_function(nullptr), _d) { + : _tsfn() { } inline ThreadSafeFunction::ThreadSafeFunction( napi_threadsafe_function tsfn) - : _tsfn(new napi_threadsafe_function(tsfn), _d) { + : _tsfn(tsfn) { } -inline ThreadSafeFunction::ThreadSafeFunction(ThreadSafeFunction&& other) - : _tsfn(std::move(other._tsfn)) { - other._tsfn.reset(); -} - -inline ThreadSafeFunction& ThreadSafeFunction::operator =( - ThreadSafeFunction&& other) { - if (*_tsfn != nullptr) { - Error::Fatal("ThreadSafeFunction::operator =", - "You cannot assign a new TSFN because existing one is still alive."); - return *this; - } - _tsfn = std::move(other._tsfn); - other._tsfn.reset(); - return *this; +inline ThreadSafeFunction::operator napi_threadsafe_function() const { + return _tsfn; } inline napi_status ThreadSafeFunction::BlockingCall() const { @@ -4090,34 +4077,34 @@ inline napi_status ThreadSafeFunction::NonBlockingCall( inline void ThreadSafeFunction::Ref(napi_env env) const { if (_tsfn != nullptr) { - napi_status status = napi_ref_threadsafe_function(env, *_tsfn); + napi_status status = napi_ref_threadsafe_function(env, _tsfn); NAPI_THROW_IF_FAILED_VOID(env, status); } } inline void ThreadSafeFunction::Unref(napi_env env) const { if (_tsfn != nullptr) { - napi_status status = napi_unref_threadsafe_function(env, *_tsfn); + napi_status status = napi_unref_threadsafe_function(env, _tsfn); NAPI_THROW_IF_FAILED_VOID(env, status); } } inline napi_status ThreadSafeFunction::Acquire() const { - return napi_acquire_threadsafe_function(*_tsfn); + return napi_acquire_threadsafe_function(_tsfn); } inline napi_status ThreadSafeFunction::Release() { - return napi_release_threadsafe_function(*_tsfn, napi_tsfn_release); + return napi_release_threadsafe_function(_tsfn, napi_tsfn_release); } inline napi_status ThreadSafeFunction::Abort() { - return napi_release_threadsafe_function(*_tsfn, napi_tsfn_abort); + return napi_release_threadsafe_function(_tsfn, napi_tsfn_abort); } inline ThreadSafeFunction::ConvertibleContext ThreadSafeFunction::GetContext() const { void* context; - napi_get_threadsafe_function_context(*_tsfn, &context); + napi_get_threadsafe_function_context(_tsfn, &context); return ConvertibleContext({ context }); } @@ -4140,10 +4127,10 @@ inline ThreadSafeFunction ThreadSafeFunction::New(napi_env env, ThreadSafeFunction tsfn; auto* finalizeData = new details::ThreadSafeFinalize({ data, finalizeCallback, tsfn._tsfn.get() }); + FinalizerDataType>({ data, finalizeCallback, &tsfn._tsfn }); napi_status status = napi_create_threadsafe_function(env, callback, resource, Value::From(env, resourceName), maxQueueSize, initialThreadCount, - finalizeData, wrapper, context, CallJS, tsfn._tsfn.get()); + finalizeData, wrapper, context, CallJS, &tsfn._tsfn); if (status != napi_ok) { delete finalizeData; NAPI_THROW_IF_FAILED(env, status, ThreadSafeFunction()); @@ -4156,7 +4143,7 @@ inline napi_status ThreadSafeFunction::CallInternal( CallbackWrapper* callbackWrapper, napi_threadsafe_function_call_mode mode) const { napi_status status = napi_call_threadsafe_function( - *_tsfn, callbackWrapper, mode); + _tsfn, callbackWrapper, mode); if (status != napi_ok && callbackWrapper != nullptr) { delete callbackWrapper; } diff --git a/napi.h b/napi.h index f4a1ecb57..d3a79a1c0 100644 --- a/napi.h +++ b/napi.h @@ -2009,8 +2009,7 @@ namespace Napi { ThreadSafeFunction(); ThreadSafeFunction(napi_threadsafe_function tsFunctionValue); - ThreadSafeFunction(ThreadSafeFunction&& other); - ThreadSafeFunction& operator=(ThreadSafeFunction&& other); + operator napi_threadsafe_function() const; // This API may be called from any thread. napi_status BlockingCall() const; @@ -2082,13 +2081,8 @@ namespace Napi { napi_value jsCallback, void* context, void* data); - struct Deleter { - // napi_threadsafe_function is managed by Node.js, leave it alone. - void operator()(napi_threadsafe_function*) const {}; - }; - std::unique_ptr _tsfn; - Deleter _d; + napi_threadsafe_function _tsfn; }; template diff --git a/test/binding.cc b/test/binding.cc index 0490a85c3..e97df1350 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -41,6 +41,7 @@ Object InitObjectDeprecated(Env env); Object InitPromise(Env env); #if (NAPI_VERSION > 3) Object InitThreadSafeFunctionPtr(Env env); +Object InitThreadSafeFunctionSum(Env env); Object InitThreadSafeFunctionUnref(Env env); Object InitThreadSafeFunction(Env env); #endif @@ -90,6 +91,7 @@ Object Init(Env env, Object exports) { exports.Set("promise", InitPromise(env)); #if (NAPI_VERSION > 3) exports.Set("threadsafe_function_ptr", InitThreadSafeFunctionPtr(env)); + exports.Set("threadsafe_function_sum", InitThreadSafeFunctionSum(env)); exports.Set("threadsafe_function_unref", InitThreadSafeFunctionUnref(env)); exports.Set("threadsafe_function", InitThreadSafeFunction(env)); #endif diff --git a/test/binding.gyp b/test/binding.gyp index 769175d8b..b96febda6 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -36,6 +36,7 @@ 'object/set_property.cc', 'promise.cc', 'threadsafe_function/threadsafe_function_ptr.cc', + 'threadsafe_function/threadsafe_function_sum.cc', 'threadsafe_function/threadsafe_function_unref.cc', 'threadsafe_function/threadsafe_function.cc', 'typedarray.cc', diff --git a/test/index.js b/test/index.js index d68594449..38b2a5235 100644 --- a/test/index.js +++ b/test/index.js @@ -40,6 +40,7 @@ let testModules = [ 'object/set_property', 'promise', 'threadsafe_function/threadsafe_function_ptr', + 'threadsafe_function/threadsafe_function_sum', 'threadsafe_function/threadsafe_function_unref', 'threadsafe_function/threadsafe_function', 'typedarray', @@ -71,6 +72,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) && (process.env.npm_config_NAPI_VERSION < 4)) { testModules.splice(testModules.indexOf('asyncprogressworker'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1); + testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_sum'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_unref'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1); } diff --git a/test/threadsafe_function/threadsafe_function_sum.cc b/test/threadsafe_function/threadsafe_function_sum.cc new file mode 100644 index 000000000..bba57dd47 --- /dev/null +++ b/test/threadsafe_function/threadsafe_function_sum.cc @@ -0,0 +1,199 @@ +#include "napi.h" +#include +#include +#include +#include + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct TestData { + + TestData(Promise::Deferred&& deferred) : deferred(std::move(deferred)) {}; + + // Native Promise returned to JavaScript + Promise::Deferred deferred; + + // List of threads created for test. This list only ever accessed via main + // thread. + std::vector threads = {}; + + ThreadSafeFunction tsfn = ThreadSafeFunction(); +}; + +void FinalizerCallback(Napi::Env env, TestData* finalizeData){ + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env,true)); + delete finalizeData; +} + +/** + * See threadsafe_function_sum.js for descriptions of the tests in this file + */ + +void entryWithTSFN(ThreadSafeFunction tsfn, int threadId) { + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall( [=](Napi::Env env, Function callback) { + callback.Call( { Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); +} + +static Value TestWithTSFN(const CallbackInfo& info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData *testData = new TestData(Promise::Deferred::New(info.Env())); + + ThreadSafeFunction tsfn = ThreadSafeFunction::New( + info.Env(), cb, "Test", 0, threadCount, + std::function(FinalizerCallback), testData); + + for (int i = 0; i < threadCount; ++i) { + // A copy of the ThreadSafeFunction will go to the thread entry point + testData->threads.push_back( std::thread(entryWithTSFN, tsfn, i) ); + } + + return testData->deferred.Promise(); +} + +// Task instance created for each new std::thread +class DelayedTSFNTask { +public: + // Each instance has its own tsfn + ThreadSafeFunction tsfn; + + // Thread-safety + std::mutex mtx; + std::condition_variable cv; + + // Entry point for std::thread + void entryDelayedTSFN(int threadId) { + std::unique_lock lk(mtx); + cv.wait(lk); + tsfn.BlockingCall([=](Napi::Env env, Function callback) { + callback.Call({Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); + }; +}; + +struct TestDataDelayed { + + TestDataDelayed(Promise::Deferred &&deferred) + : deferred(std::move(deferred)){}; + ~TestDataDelayed() { taskInsts.clear(); }; + // Native Promise returned to JavaScript + Promise::Deferred deferred; + + // List of threads created for test. This list only ever accessed via main + // thread. + std::vector threads = {}; + + // List of DelayedTSFNThread instances + std::vector> taskInsts = {}; + + ThreadSafeFunction tsfn = ThreadSafeFunction(); +}; + +void FinalizerCallbackDelayed(Napi::Env env, TestDataDelayed *finalizeData) { + for (size_t i = 0; i < finalizeData->threads.size(); ++i) { + finalizeData->threads[i].join(); + } + finalizeData->deferred.Resolve(Boolean::New(env, true)); + delete finalizeData; +} + +static Value TestDelayedTSFN(const CallbackInfo &info) { + int threadCount = info[0].As().Int32Value(); + Function cb = info[1].As(); + + TestDataDelayed *testData = + new TestDataDelayed(Promise::Deferred::New(info.Env())); + + testData->tsfn = + ThreadSafeFunction::New(info.Env(), cb, "Test", 0, threadCount, + std::function( + FinalizerCallbackDelayed), + testData); + + for (int i = 0; i < threadCount; ++i) { + testData->taskInsts.push_back( + std::unique_ptr(new DelayedTSFNTask())); + testData->threads.push_back(std::thread(&DelayedTSFNTask::entryDelayedTSFN, + testData->taskInsts.back().get(), + i)); + } + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + + for (auto &task : testData->taskInsts) { + std::lock_guard lk(task->mtx); + task->tsfn = testData->tsfn; + task->cv.notify_all(); + } + + return testData->deferred.Promise(); +} + +void entryAcquire(ThreadSafeFunction tsfn, int threadId) { + tsfn.Acquire(); + std::this_thread::sleep_for(std::chrono::milliseconds(std::rand() % 100 + 1)); + tsfn.BlockingCall( [=](Napi::Env env, Function callback) { + callback.Call( { Number::New(env, static_cast(threadId))}); + }); + tsfn.Release(); +} + +static Value CreateThread(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + ThreadSafeFunction tsfn = testData->tsfn; + int threadId = testData->threads.size(); + // A copy of the ThreadSafeFunction will go to the thread entry point + testData->threads.push_back( std::thread(entryAcquire, tsfn, threadId) ); + return Number::New(info.Env(), threadId); +} + +static Value StopThreads(const CallbackInfo& info) { + TestData* testData = static_cast(info.Data()); + ThreadSafeFunction tsfn = testData->tsfn; + tsfn.Release(); + return info.Env().Undefined(); +} + +static Value TestAcquire(const CallbackInfo& info) { + Function cb = info[0].As(); + Napi::Env env = info.Env(); + + // We pass the test data to the Finalizer for cleanup. The finalizer is + // responsible for deleting this data as well. + TestData *testData = new TestData(Promise::Deferred::New(info.Env())); + + testData->tsfn = ThreadSafeFunction::New( + env, cb, "Test", 0, 1, + std::function(FinalizerCallback), testData); + + Object result = Object::New(env); + result["createThread"] = Function::New( env, CreateThread, "createThread", testData); + result["stopThreads"] = Function::New( env, StopThreads, "stopThreads", testData); + result["promise"] = testData->deferred.Promise(); + + return result; +} +} + +Object InitThreadSafeFunctionSum(Env env) { + Object exports = Object::New(env); + exports["testDelayedTSFN"] = Function::New(env, TestDelayedTSFN); + exports["testWithTSFN"] = Function::New(env, TestWithTSFN); + exports["testAcquire"] = Function::New(env, TestAcquire); + return exports; +} + +#endif diff --git a/test/threadsafe_function/threadsafe_function_sum.js b/test/threadsafe_function/threadsafe_function_sum.js new file mode 100644 index 000000000..4323dabeb --- /dev/null +++ b/test/threadsafe_function/threadsafe_function_sum.js @@ -0,0 +1,65 @@ +'use strict'; +const assert = require('assert'); +const buildType = process.config.target_defaults.default_configuration; + +/** + * + * ThreadSafeFunction Tests: Thread Id Sums + * + * Every native C++ function that utilizes the TSFN will call the registered + * callback with the thread id. Passing Array.prototype.push with a bound array + * will push the thread id to the array. Therefore, starting `N` threads, we + * will expect the sum of all elements in the array to be `(N-1) * (N) / 2` (as + * thread IDs are 0-based) + * + * We check different methods of passing a ThreadSafeFunction around multiple + * threads: + * - `testWithTSFN`: The main thread creates the TSFN. Then, it creates + * threads, passing the TSFN at thread construction. The number of threads is + * static (known at TSFN creation). + * - `testDelayedTSFN`: The main thread creates threads, passing a promise to a + * TSFN at construction. Then, it creates the TSFN, and resolves each + * threads' promise. The number of threads is static. + * - `testAcquire`: The native binding returns a function to start a new. A + * call to this function will return `false` once `N` calls have been made. + * Each thread will acquire its own use of the TSFN, call it, and then + * release. + */ + +const THREAD_COUNT = 5; +const EXPECTED_SUM = (THREAD_COUNT - 1) * (THREAD_COUNT) / 2; + +module.exports = Promise.all([ + test(require(`../build/${buildType}/binding.node`)), + test(require(`../build/${buildType}/binding_noexcept.node`)) +]); + +/** @param {number[]} N */ +const sum = (N) => N.reduce((sum, n) => sum + n, 0); + +function test(binding) { + async function check(bindingFunction) { + const calls = []; + const result = await bindingFunction(THREAD_COUNT, Array.prototype.push.bind(calls)); + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + async function checkAcquire() { + const calls = []; + const { promise, createThread, stopThreads } = binding.threadsafe_function_sum.testAcquire(Array.prototype.push.bind(calls)); + for (let i = 0; i < THREAD_COUNT; i++) { + createThread(); + } + stopThreads(); + const result = await promise; + assert.ok(result); + assert.equal(sum(calls), EXPECTED_SUM); + } + + return Promise.all([ + check(binding.threadsafe_function_sum.testDelayedTSFN), + check(binding.threadsafe_function_sum.testWithTSFN), + checkAcquire() + ]); +}