From 16f72492f8eb7154eca0c61197f57bc40eb655b8 Mon Sep 17 00:00:00 2001 From: legendecas Date: Tue, 27 Aug 2019 21:11:14 +0800 Subject: [PATCH] Implement AsyncProgressWorker --- doc/async_progress_worker.md | 413 +++++++++++++++++++++++++++++++++++ napi-inl.h | 87 ++++++++ napi.h | 37 ++++ test/asyncprogressworker.cc | 80 +++++++ test/asyncprogressworker.js | 38 ++++ test/binding.cc | 6 + test/binding.gyp | 1 + test/index.js | 2 + 8 files changed, 664 insertions(+) create mode 100644 doc/async_progress_worker.md create mode 100644 test/asyncprogressworker.cc create mode 100644 test/asyncprogressworker.js diff --git a/doc/async_progress_worker.md b/doc/async_progress_worker.md new file mode 100644 index 000000000..204c415c2 --- /dev/null +++ b/doc/async_progress_worker.md @@ -0,0 +1,413 @@ +# AsyncProgressWorker + +`Napi::AsyncProgressWorker` is an abstract class that you can subclass to remove +many of the tedious tasks of moving data between the event loop and worker +threads. This class internally handles all the details of creating and executing +an asynchronous operation. + +`Napi::AsyncProgressWorker` implements `Napi::AsyncWorker`, while extends it +internally with `Napi::ThreadSafeFunction` to moving work progress reports from +worker to event loop threads. + +Once created, execution is requested by calling `Napi::AsyncProgressWorker::Queue`. +When a thread is available for execution the `Napi::AsyncProgressWorker::Execute` +method will be invoked. During the execution, `Napi::AsyncProgressWorker::ExecutionProgress::Send` +could be used to indicate execution process, which would eventually invoke +`Napi::AsyncProgressWorker::OnProgress` on JavaScript thread to safely call into +JavaScript lands. Once `Napi::AsyncProgressWorker::Execute` completes either +`Napi::AsyncProgressWorker::OnOK` or `Napi::AsyncProgressWorker::OnError` will +be invoked. Once the `Napi::AsyncProgressWorker::OnOK` or `Napi::AsyncProgressWorker::OnError` +methods are complete the `Napi::AsyncProgressWorker` instance is destructed. + +For the most basic use, only the `Napi::AsyncProgressWorker::Execute` and +`Napi::AsyncProgressWorker::OnProgress` method must be implemented in a subclass. + +## Methods + +### Env + +Requests the environment in which the async worker has been initially created. + +```cpp +Napi::Env Napi::AsyncProgressWorker::Env() const; +``` + +Returns the environment in which the async worker has been created. + +### Queue + +Requests that the work be queued for execution. + +```cpp +void Napi::AsyncProgressWorker::Queue(); +``` + +### Cancel + +Cancels queued work if it has not yet been started. If it has already started +executing, it cannot be cancelled. If cancelled successfully neither +`OnOK` nor `OnError` will be called. + +```cpp +void Napi::AsyncProgressWorker::Cancel(); +``` + +### Receiver + +```cpp +Napi::ObjectReference& Napi::AsyncProgressWorker::Receiver(); +``` + +Returns the persistent object reference of the receiver object set when the async +worker was created. + +### Callback + +```cpp +Napi::FunctionReference& Napi::AsyncProgressWorker::Callback(); +``` + +Returns the persistent function reference of the callback set when the async +worker was created. The returned function reference will receive the results of +the computation that happened in the `Napi::AsyncProgressWorker::Execute` method, +unless the default implementation of `Napi::AsyncProgressWorker::OnOK` or +`Napi::AsyncProgressWorker::OnError` is overridden. + +### SuppressDestruct + +```cpp +void Napi::AsyncProgressWorker::SuppressDestruct(); +``` + +Prevents the destruction of the `Napi::AsyncProgressWorker` instance upon completion +of the `Napi::AsyncProgressWorker::OnOK` callback. + +### SetError + +Sets the error message for the error that happened during the execution. Setting +an error message will cause the `Napi::AsyncProgressWorker::OnError` method to be +invoked instead of `Napi::AsyncProgressWorker::OnOK` once the +`Napi::AsyncProgressWorker::Execute` method completes. + +```cpp +void Napi::AsyncProgressWorker::SetError(const std::string& error); +``` + +- `[in] error`: The reference to the string that represent the message of the error. + +### Execute + +This method is used to execute some tasks out of the **event loop** on a libuv +worker thread. Subclasses must implement this method and the method is run on +a thread other than that running the main event loop. As the method is not +running on the main event loop, it must avoid calling any methods from node-addon-api +or running any code that might invoke JavaScript. Instead, once this method is +complete any interaction through node-addon-api with JavaScript should be implemented +in the `Napi::AsyncProgressWorker::OnOK` method which runs on the main thread and is +invoked when the `Napi::AsyncProgressWorker::Execute` method completes. + +```cpp +virtual void Napi::AsyncProgressWorker::Execute(const ExecutionProgress& progress) = 0; +``` + +### OnOK + +This method is invoked when the computation in the `Execute` method ends. +The default implementation runs the Callback optionally provided when the +AsyncProgressWorker class was created. The callback will by default receive no +arguments. To provide arguments, override the `GetResult()` method. + +```cpp +virtual void Napi::AsyncProgressWorker::OnOK(); +``` +### GetResult + +This method returns the arguments passed to the Callback invoked by the default +`OnOK()` implementation. The default implementation returns an empty vector, +providing no arguments to the Callback. + +```cpp +virtual std::vector Napi::AsyncProgressWorker::GetResult(Napi::Env env); +``` + +### OnError + +This method is invoked after `Napi::AsyncProgressWorker::Execute` completes if an error +occurs while `Napi::AsyncProgressWorker::Execute` is running and C++ exceptions are +enabled or if an error was set through a call to `Napi::AsyncProgressWorker::SetError`. +The default implementation calls the callback provided when the `Napi::AsyncProgressWorker` +class was created, passing in the error as the first parameter. + +```cpp +virtual void Napi::AsyncProgressWorker::OnError(const Napi::Error& e); +``` + +### Destroy + +This method is invoked when the instance must be deallocated. If +`SuppressDestruct()` was not called then this method will be called after either +`OnError()` or `OnOK()` complete. The default implementation of this method +causes the instance to delete itself using the `delete` operator. The method is +provided so as to ensure that instances allocated by means other than the `new` +operator can be deallocated upon work completion. + +```cpp +virtual void Napi::AsyncProgressWorker::Destroy(); +``` + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Function& callback); +``` + +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for +execution by calling `Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Function& callback, const char* resource_name); +``` + +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for execution by +calling `Napi::AsyncWork::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Function& callback, const char* resource_name, const Napi::Object& resource); +``` + +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. +- `[in] resource`: Object associated with the asynchronous operation that +will be passed to possible async_hooks. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for execution by +calling `Napi::AsyncWork::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Object& receiver, const Napi::Function& callback); +``` + +- `[in] receiver`: The `this` object passed to the called function. +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for execution by +calling `Napi::AsyncWork::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Object& receiver, const Napi::Function& callback, const char* resource_name); +``` + +- `[in] receiver`: The `this` object passed to the called function. +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. + +Returns a `Napi::AsyncWork` instance which can later be queued for execution by +calling `Napi::AsyncWork::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(const Napi::Object& receiver, const Napi::Function& callback, const char* resource_name, const Napi::Object& resource); +``` + +- `[in] receiver`: The `this` object passed to the called function. +- `[in] callback`: The function which will be called when an asynchronous +operations ends. The given function is called from the main event loop thread. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. +- `[in] resource`: Object associated with the asynchronous operation that +will be passed to possible async_hooks. + +Returns a `Napi::AsyncWork` instance which can later be queued for execution by +calling `Napi::AsyncWork::Queue`. + + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(Napi::Env env); +``` + +- `[in] env`: The environment in which to create the `Napi::AsyncProgressWorker`. + +Returns an `Napi::AsyncProgressWorker` instance which can later be queued for execution by calling +`Napi::AsyncProgressWorker::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(Napi::Env env, const char* resource_name); +``` + +- `[in] env`: The environment in which to create the `Napi::AsyncProgressWorker`. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for execution by +calling `Napi::AsyncProgressWorker::Queue`. + +### Constructor + +Creates a new `Napi::AsyncProgressWorker`. + +```cpp +explicit Napi::AsyncProgressWorker(Napi::Env env, const char* resource_name, const Napi::Object& resource); +``` + +- `[in] env`: The environment in which to create the `Napi::AsyncProgressWorker`. +- `[in] resource_name`: Null-terminated strings that represents the +identifier for the kind of resource that is being provided for diagnostic +information exposed by the async_hooks API. +- `[in] resource`: Object associated with the asynchronous operation that +will be passed to possible async_hooks. + +Returns a `Napi::AsyncProgressWorker` instance which can later be queued for execution by +calling `Napi::AsyncProgressWorker::Queue`. + +### Destructor + +Deletes the created work object that is used to execute logic asynchronously. + +```cpp +virtual Napi::AsyncProgressWorker::~AsyncProgressWorker(); +``` + +## Operator + +```cpp +Napi::AsyncProgressWorker::operator napi_async_work() const; +``` + +Returns the N-API napi_async_work wrapped by the `Napi::AsyncProgressWorker` object. This +can be used to mix usage of the C N-API and node-addon-api. + +## Example + +The first step to use the `Napi::AsyncProgressWorker` class is to create a new class that +inherits from it and implement the `Napi::AsyncProgressWorker::Execute` abstract method. +Typically input to your worker will be saved within class' fields generally +passed in through its constructor. + +When the `Napi::AsyncProgressWorker::Execute` method completes without errors the +`Napi::AsyncProgressWorker::OnOK` function callback will be invoked. In this function the +results of the computation will be reassembled and returned back to the initial +JavaScript context. + +`Napi::AsyncProgressWorker` ensures that all the code in the `Napi::AsyncProgressWorker::Execute` +function runs in the background out of the **event loop** thread and at the end +the `Napi::AsyncProgressWorker::OnOK` or `Napi::AsyncProgressWorker::OnError` function will be +called and are executed as part of the event loop. + +The code below show a basic example of `Napi::AsyncProgressWorker` the implementation: + +```cpp +#include + +#include +#include + +use namespace Napi; + +class EchoWorker : public AsyncProgressWorker { + public: + EchoWorker(Function& callback, std::string& echo) + : AsyncProgressWorker(callback), echo(echo) {} + + ~EchoWorker() {} + // This code will be executed on the worker thread + void Execute(const ExecutionProgress& progress) { + // Need to simulate cpu heavy task + for (uint32_t i = 0; i < 100; ++i) { + progress.Send(&i, 1) + std::this_thread::sleep_for(std::chrono::seconds(1)); + } + } + + void OnOK() { + HandleScope scope(Env()); + Callback().Call({Env().Null(), String::New(Env(), echo)}); + } + + void OnProgress(const uint32_t* data, size_t /* count */) { + HandleScope scope(Env()); + Callback().Call({Env().Null(), Env().Null(), Number::New(Env(), data)}); + } + + private: + std::string echo; +}; +``` + +The `EchoWorker`'s contructor calls the base class' constructor to pass in the +callback that the `Napi::AsyncProgressWorker` base class will store persistently. When +the work on the `Napi::AsyncProgressWorker::Execute` method is done the +`Napi::AsyncProgressWorker::OnOk` method is called and the results return back to +JavaScript invoking the stored callback with its associated environment. + +The following code shows an example on how to create and use an `Napi::AsyncProgressWorker` + +```cpp +#include + +// Include EchoWorker class +// .. + +use namespace Napi; + +Value Echo(const CallbackInfo& info) { + // You need to check the input data here + Function cb = info[1].As(); + std::string in = info[0].As(); + EchoWorker* wk = new EchoWorker(cb, in); + wk->Queue(); + return info.Env().Undefined(); +} +``` + +Using the implementation of a `Napi::AsyncProgressWorker` is straight forward. You only +need to create a new instance and pass to its constructor the callback you want to +execute when your asynchronous task ends and other data you need for your +computation. Once created the only other action you have to do is to call the +`Napi::AsyncProgressWorker::Queue` method that will queue the created worker for execution. diff --git a/napi-inl.h b/napi-inl.h index 64315aea6..97019beca 100644 --- a/napi-inl.h +++ b/napi-inl.h @@ -9,7 +9,9 @@ // Note: Do not include this file directly! Include "napi.h" instead. +#include #include +#include #include namespace Napi { @@ -4112,6 +4114,91 @@ inline void ThreadSafeFunction::CallJS(napi_env env, Function(env, jsCallback).Call({}); } } + +//////////////////////////////////////////////////////////////////////////////// +// Async Progress Worker class +//////////////////////////////////////////////////////////////////////////////// + +template +inline AsyncProgressWorker::AsyncProgressWorker(const Function& callback, + const char* resource_name, + const Object& resource) + : AsyncWorker(callback, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) { + _tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1); +} + +template +inline AsyncProgressWorker::~AsyncProgressWorker() { + // Abort pending tsfn call. + // Don't send progress events after we've already completed. + _tsfn.Abort(); + { + std::lock_guard lock(_mutex); + _asyncdata = nullptr; + _asyncsize = 0; + } + _tsfn.Release(); +} + +template +inline void AsyncProgressWorker::Execute() { + ExecutionProgress progress(this); + Execute(progress); +} + +template +inline void AsyncProgressWorker::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) { + AsyncProgressWorker* self = static_cast(_data); + + T* data; + size_t size; + { + std::lock_guard lock(self->_mutex); + data = self->_asyncdata; + size = self->_asyncsize; + self->_asyncdata = nullptr; + self->_asyncsize = 0; + } + + self->OnProgress(data, size); + delete[] data; +} + +template +inline void AsyncProgressWorker::SendProgress_(const T* data, size_t count) { + T* new_data = new T[count]; + { + T* it = new_data; + std::copy(data, data + count, it); + } + + T* old_data; + { + std::lock_guard lock(_mutex); + old_data = _asyncdata; + _asyncdata = new_data; + _asyncsize = count; + } + _tsfn.NonBlockingCall(this, WorkProgress_); + + delete[] old_data; +} + +template +inline void AsyncProgressWorker::Signal() const { + _tsfn.NonBlockingCall(this, WorkProgress_); +} + +template +inline void AsyncProgressWorker::ExecutionProgress::Signal() const { + _that->Signal(); +} + +template +inline void AsyncProgressWorker::ExecutionProgress::Send(const T* data, size_t count) const { + _that->SendProgress_(data, count); +} + #endif //////////////////////////////////////////////////////////////////////////////// diff --git a/napi.h b/napi.h index 921aaa8bc..4d9621b49 100644 --- a/napi.h +++ b/napi.h @@ -5,6 +5,7 @@ #include #include #include +#include #include #include @@ -2061,6 +2062,42 @@ namespace Napi { std::unique_ptr _tsfn; Deleter _d; }; + + template + class AsyncProgressWorker : public AsyncWorker { + public: + virtual ~AsyncProgressWorker(); + + class ExecutionProgress { + friend class AsyncProgressWorker; + public: + void Signal() const; + void Send(const T* data, size_t count) const; + private: + explicit ExecutionProgress(AsyncProgressWorker* that) : _that(that) {} + AsyncProgressWorker* const _that; + }; + + protected: + explicit AsyncProgressWorker(const Function& callback, + const char* resource_name, + const Object& resource); + + virtual void Execute(const ExecutionProgress& progress) = 0; + virtual void OnProgress(const T* data, size_t count) = 0; + + private: + static void WorkProgress_(Napi::Env env, Napi::Function jsCallback, void* data); + + void Execute() override; + void Signal() const; + void SendProgress_(const T* data, size_t count); + + std::mutex _mutex; + T* _asyncdata; + size_t _asyncsize; + ThreadSafeFunction _tsfn; + }; #endif // Memory management. diff --git a/test/asyncprogressworker.cc b/test/asyncprogressworker.cc new file mode 100644 index 000000000..a88abec3a --- /dev/null +++ b/test/asyncprogressworker.cc @@ -0,0 +1,80 @@ +#include "napi.h" + +#include +#include +#include +#include + +#if (NAPI_VERSION > 3) + +using namespace Napi; + +namespace { + +struct ProgressData { + size_t progress; +}; + +class TestWorker : public AsyncProgressWorker { +public: + static void DoWork(const CallbackInfo& info) { + bool succeed = info[0].As(); + Object resource = info[1].As(); + Function cb = info[2].As(); + Value data = info[3]; + + TestWorker* worker = new TestWorker(cb, "TestResource", resource); + worker->Receiver().Set("data", data); + worker->_succeed = succeed; + worker->Queue(); + } + +protected: + void Execute(const ExecutionProgress& progress) override { + std::unique_lock lock(_cvm); + progress.Send(new ProgressData{0}, 1); + _cv.wait(lock); + + progress.Send(new ProgressData{50}, 1); + _cv.wait(lock); + if (!_succeed) { + SetError("test error"); + } + progress.Send(new ProgressData{75}, 1); + _cv.wait(lock); + } + + void OnProgress(const ProgressData* data, size_t /* count */) override { + FunctionReference& callback = Callback(); + Napi::Env env = Env(); + if (!callback.IsEmpty()) { + Value err = env.Undefined(); + Number progress = Number::New(env, data->progress); + callback.Call(Receiver().Value(), { err, progress }); + } + _cv.notify_one(); + } + + std::vector GetResult(Napi::Env env) override { + Value err = env.Undefined(); + Number progress = Number::New(env, 100); + return {err, progress}; + } + +private: + TestWorker(Function cb, const char* resource_name, const Object& resource) + : AsyncProgressWorker(cb, resource_name, resource) {} + std::condition_variable _cv; + std::mutex _cvm; + bool _succeed; +}; + +} + +Object InitAsyncProgressWorker(Env env) { + Object exports = Object::New(env); + exports["doWork"] = Function::New(env, TestWorker::DoWork); + return exports; +} + +#endif diff --git a/test/asyncprogressworker.js b/test/asyncprogressworker.js new file mode 100644 index 000000000..7d9fd7e42 --- /dev/null +++ b/test/asyncprogressworker.js @@ -0,0 +1,38 @@ +'use strict'; +const buildType = process.config.target_defaults.default_configuration; +const common = require('./common') +const assert = require('assert'); + +test(require(`./build/${buildType}/binding.node`)); +test(require(`./build/${buildType}/binding_noexcept.node`)); + +function test(binding) { + success(binding); + fail(binding); + return; +} + +function success(binding) { + const expected = [0, 50, 75, 100]; + const actual = []; + binding.asyncprogressworker.doWork(true, {}, common.mustCall((err, _progress) => { + if (err) { + assert.fail(err); + return; + } + actual.push(_progress); + if (actual.length === expected.length) { + assert.deepEqual(actual, expected); + } + }, expected.length)); +} + +function fail(binding) { + let err = undefined + binding.asyncprogressworker.doWork(false, {}, (_err) => { + err = _err; + }); + process.on('exit', () => { + assert.throws(() => { throw err }, /test error/) + }); +} diff --git a/test/binding.cc b/test/binding.cc index dca00771a..fecd683e8 100644 --- a/test/binding.cc +++ b/test/binding.cc @@ -5,6 +5,9 @@ using namespace Napi; Object InitArrayBuffer(Env env); Object InitAsyncContext(Env env); +#if (NAPI_VERSION > 3) +Object InitAsyncProgressWorker(Env env); +#endif Object InitAsyncWorker(Env env); Object InitPersistentAsyncWorker(Env env); Object InitBasicTypesArray(Env env); @@ -49,6 +52,9 @@ Object InitThunkingManual(Env env); Object Init(Env env, Object exports) { exports.Set("arraybuffer", InitArrayBuffer(env)); exports.Set("asynccontext", InitAsyncContext(env)); +#if (NAPI_VERSION > 3) + exports.Set("asyncprogressworker", InitAsyncProgressWorker(env)); +#endif exports.Set("asyncworker", InitAsyncWorker(env)); exports.Set("persistentasyncworker", InitPersistentAsyncWorker(env)); exports.Set("basic_types_array", InitBasicTypesArray(env)); diff --git a/test/binding.gyp b/test/binding.gyp index f8c1cb0a8..8be9e792b 100644 --- a/test/binding.gyp +++ b/test/binding.gyp @@ -7,6 +7,7 @@ 'sources': [ 'arraybuffer.cc', 'asynccontext.cc', + 'asyncprogressworker.cc', 'asyncworker.cc', 'asyncworker-persistent.cc', 'basic_types/array.cc', diff --git a/test/index.js b/test/index.js index d03c2e5b7..2f9ce35ab 100644 --- a/test/index.js +++ b/test/index.js @@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration = let testModules = [ 'arraybuffer', 'asynccontext', + 'asyncprogressworker', 'asyncworker', 'asyncworker-nocallback', 'asyncworker-persistent', @@ -66,6 +67,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) && if ((process.env.npm_config_NAPI_VERSION !== undefined) && (process.env.npm_config_NAPI_VERSION < 4)) { + testModules.splice(testModules.indexOf('asyncprogressworker'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1); testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1); }