Skip to content

Commit

Permalink
Implement AsyncProgressQueueWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Jan 26, 2020
1 parent 630c005 commit 01d157c
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 0 deletions.
117 changes: 117 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -4771,6 +4771,123 @@ template<class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
_worker->SendProgress_(data, count);
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Queue Worker class
////////////////////////////////////////////////////////////////////////////////
template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback)
: AsyncProgressQueueWorker(callback, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback,
const char* resource_name)
: AsyncProgressQueueWorker(callback, resource_name, Object::New(callback.Env())) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncProgressQueueWorker(Object::New(callback.Env()),
callback,
resource_name,
resource) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback)
: AsyncProgressQueueWorker(receiver, callback, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name)
: AsyncProgressQueueWorker(receiver,
callback,
resource_name,
Object::New(callback.Env())) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncProgressWorkerBase<std::pair<T*, size_t>>(receiver, callback, resource_name, resource, /** unlimited queue size */0) {
}

#if NAPI_VERSION > 4
template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env)
: AsyncProgressQueueWorker(env, "generic") {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name)
: AsyncProgressQueueWorker(env, resource_name, Object::New(env)) {
}

template<class T>
inline AsyncProgressQueueWorker<T>::AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name,
const Object& resource)
: AsyncProgressWorkerBase<std::pair<T*, size_t>>(env, resource_name, resource, /** unlimited queue size */0) {
}
#endif

template<class T>
inline void AsyncProgressQueueWorker<T>::Execute() {
ExecutionProgress progress(this);
Execute(progress);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::OnWorkProgress(std::pair<T*, size_t>* datapair) {
if (datapair == nullptr) {
return;
}

T *data = datapair->first;
size_t size = datapair->second;

this->OnProgress(data, size);
delete datapair;
delete[] data;
}

template<class T>
inline void AsyncProgressQueueWorker<T>::SendProgress_(const T* data, size_t count) {
T* new_data = new T[count];
std::copy(data, data + count, new_data);

auto pair = new std::pair<T*, size_t>(new_data, count);
this->NonBlockingCall(pair);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::Signal() const {
this->NonBlockingCall(nullptr);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::OnWorkComplete(Napi::Env env, napi_status status) {
AsyncWorker::OnWorkComplete(env, status);
}

template<class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
}

template<class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
_worker->SendProgress_(data, count);
}
#endif

////////////////////////////////////////////////////////////////////////////////
Expand Down
54 changes: 54 additions & 0 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -2350,6 +2350,60 @@ namespace Napi {
T* _asyncdata;
size_t _asyncsize;
};

template<class T>
class AsyncProgressQueueWorker : public AsyncProgressWorkerBase<std::pair<T*, size_t>> {
public:
virtual ~AsyncProgressQueueWorker() {};

class ExecutionProgress {
friend class AsyncProgressQueueWorker;
public:
void Signal() const;
void Send(const T* data, size_t count) const;
private:
explicit ExecutionProgress(AsyncProgressQueueWorker* worker) : _worker(worker) {}
AsyncProgressQueueWorker* const _worker;
};

void OnWorkComplete(Napi::Env env, napi_status status) override;
void OnWorkProgress(std::pair<T*, size_t>*) override;

protected:
explicit AsyncProgressQueueWorker(const Function& callback);
explicit AsyncProgressQueueWorker(const Function& callback,
const char* resource_name);
explicit AsyncProgressQueueWorker(const Function& callback,
const char* resource_name,
const Object& resource);
explicit AsyncProgressQueueWorker(const Object& receiver,
const Function& callback);
explicit AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name);
explicit AsyncProgressQueueWorker(const Object& receiver,
const Function& callback,
const char* resource_name,
const Object& resource);

// Optional callback of Napi::ThreadSafeFunction only available after NAPI_VERSION 4.
// Refs: https://github.com/nodejs/node/pull/27791
#if NAPI_VERSION > 4
explicit AsyncProgressQueueWorker(Napi::Env env);
explicit AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name);
explicit AsyncProgressQueueWorker(Napi::Env env,
const char* resource_name,
const Object& resource);
#endif
virtual void Execute(const ExecutionProgress& progress) = 0;
virtual void OnProgress(const T* data, size_t count) = 0;

private:
void Execute() override;
void Signal() const;
void SendProgress_(const T* data, size_t count);
};
#endif

// Memory management.
Expand Down
78 changes: 78 additions & 0 deletions test/asyncprogressqueueworker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#include "napi.h"

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

#if (NAPI_VERSION > 3)

using namespace Napi;

namespace {

struct ProgressData {
int32_t progress;
};

class TestWorker : public AsyncProgressQueueWorker<ProgressData> {
public:
static void DoWork(const CallbackInfo& info) {
int32_t times = info[0].As<Number>().Int32Value();
Function cb = info[1].As<Function>();
Function progress = info[2].As<Function>();

TestWorker* worker = new TestWorker(cb, progress, "TestResource", Object::New(info.Env()));
worker->_times = times;
worker->Queue();
}

protected:
void Execute(const ExecutionProgress& progress) override {
if (_times < 0) {
SetError("test error");
}
ProgressData data{0};
for (int32_t idx = 0; idx < _times; idx++) {
data.progress = idx;
progress.Send(&data, 1);
}
// keep worker alive until we processed all progress.
if (_times > 0) {
std::unique_lock<std::mutex> lock(_cvm);
_cv.wait(lock);
}
}

void OnProgress(const ProgressData* data, size_t /* count */) override {
Napi::Env env = Env();
if (!_progress.IsEmpty()) {
Number progress = Number::New(env, data->progress);
_progress.MakeCallback(Receiver().Value(), { progress });
}
if (data->progress + 1 == _times) {
_cv.notify_one();
}
}

private:
TestWorker(Function cb, Function progress, const char* resource_name, const Object& resource)
: AsyncProgressQueueWorker(cb, resource_name, resource) {
_progress.Reset(progress, 1);
}

std::condition_variable _cv;
std::mutex _cvm;
int32_t _times;
FunctionReference _progress;
};

}

Object InitAsyncProgressQueueWorker(Env env) {
Object exports = Object::New(env);
exports["doWork"] = Function::New(env, TestWorker::DoWork);
return exports;
}

#endif
42 changes: 42 additions & 0 deletions test/asyncprogressqueueworker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
'use strict';
const buildType = process.config.target_defaults.default_configuration;
const common = require('./common')
const assert = require('assert');

test(require(`./build/${buildType}/binding.node`));
test(require(`./build/${buildType}/binding_noexcept.node`));

function test({ asyncprogressqueueworker }) {
success(asyncprogressqueueworker);
fail(asyncprogressqueueworker);
return;
}

function success(binding) {
const expected = [0, 1, 2, 3];
const actual = [];
binding.doWork(expected.length,
common.mustCall((err) => {
if (err) {
assert.fail(err);
}
}),
common.mustCall((_progress) => {
actual.push(_progress);
if (actual.length === expected.length) {
assert.deepEqual(actual, expected);
}
}, expected.length)
);
}

function fail(binding) {
binding.doWork(-1,
common.mustCall((err) => {
assert.throws(() => { throw err }, /test error/)
}),
() => {
assert.fail('unexpected progress report');
}
);
}
2 changes: 2 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ using namespace Napi;
Object InitArrayBuffer(Env env);
Object InitAsyncContext(Env env);
#if (NAPI_VERSION > 3)
Object InitAsyncProgressQueueWorker(Env env);
Object InitAsyncProgressWorker(Env env);
#endif
Object InitAsyncWorker(Env env);
Expand Down Expand Up @@ -60,6 +61,7 @@ Object Init(Env env, Object exports) {
exports.Set("arraybuffer", InitArrayBuffer(env));
exports.Set("asynccontext", InitAsyncContext(env));
#if (NAPI_VERSION > 3)
exports.Set("asyncprogressqueueworker", InitAsyncProgressQueueWorker(env));
exports.Set("asyncprogressworker", InitAsyncProgressWorker(env));
#endif
exports.Set("asyncworker", InitAsyncWorker(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
'sources': [
'arraybuffer.cc',
'asynccontext.cc',
'asyncprogressqueueworker.cc',
'asyncprogressworker.cc',
'asyncworker.cc',
'asyncworker-persistent.cc',
Expand Down
2 changes: 2 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration =
let testModules = [
'arraybuffer',
'asynccontext',
'asyncprogressqueueworker',
'asyncprogressworker',
'asyncworker',
'asyncworker-nocallback',
Expand Down Expand Up @@ -72,6 +73,7 @@ if (napiVersion < 3) {
}

if (napiVersion < 4) {
testModules.splice(testModules.indexOf('asyncprogressqueueworker'), 1);
testModules.splice(testModules.indexOf('asyncprogressworker'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ctx'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_existing_tsfn'), 1);
Expand Down

0 comments on commit 01d157c

Please sign in to comment.