Skip to content

Commit

Permalink
Implement AsyncProgressWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
legendecas committed Sep 16, 2019
1 parent 6192e70 commit 16f7249
Show file tree
Hide file tree
Showing 8 changed files with 664 additions and 0 deletions.
413 changes: 413 additions & 0 deletions doc/async_progress_worker.md

Large diffs are not rendered by default.

87 changes: 87 additions & 0 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@

// Note: Do not include this file directly! Include "napi.h" instead.

#include <algorithm>
#include <cstring>
#include <mutex>
#include <type_traits>

namespace Napi {
Expand Down Expand Up @@ -4112,6 +4114,91 @@ inline void ThreadSafeFunction::CallJS(napi_env env,
Function(env, jsCallback).Call({});
}
}

////////////////////////////////////////////////////////////////////////////////
// Async Progress Worker class
////////////////////////////////////////////////////////////////////////////////

template<class T>
inline AsyncProgressWorker<T>::AsyncProgressWorker(const Function& callback,
const char* resource_name,
const Object& resource)
: AsyncWorker(callback, resource_name, resource), _asyncdata(nullptr), _asyncsize(0) {
_tsfn = ThreadSafeFunction::New(callback.Env(), callback, resource_name, 1, 1);
}

template<class T>
inline AsyncProgressWorker<T>::~AsyncProgressWorker() {
// Abort pending tsfn call.
// Don't send progress events after we've already completed.
_tsfn.Abort();
{
std::lock_guard<std::mutex> lock(_mutex);
_asyncdata = nullptr;
_asyncsize = 0;
}
_tsfn.Release();
}

template<class T>
inline void AsyncProgressWorker<T>::Execute() {
ExecutionProgress progress(this);
Execute(progress);
}

template<class T>
inline void AsyncProgressWorker<T>::WorkProgress_(Napi::Env /* env */, Napi::Function /* jsCallback */, void* _data) {
AsyncProgressWorker* self = static_cast<AsyncProgressWorker*>(_data);

T* data;
size_t size;
{
std::lock_guard<std::mutex> lock(self->_mutex);
data = self->_asyncdata;
size = self->_asyncsize;
self->_asyncdata = nullptr;
self->_asyncsize = 0;
}

self->OnProgress(data, size);
delete[] data;
}

template<class T>
inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {
T* new_data = new T[count];
{
T* it = new_data;
std::copy(data, data + count, it);
}

T* old_data;
{
std::lock_guard<std::mutex> lock(_mutex);
old_data = _asyncdata;
_asyncdata = new_data;
_asyncsize = count;
}
_tsfn.NonBlockingCall(this, WorkProgress_);

delete[] old_data;
}

template<class T>
inline void AsyncProgressWorker<T>::Signal() const {
_tsfn.NonBlockingCall(this, WorkProgress_);
}

template<class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Signal() const {
_that->Signal();
}

template<class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Send(const T* data, size_t count) const {
_that->SendProgress_(data, count);
}

#endif

////////////////////////////////////////////////////////////////////////////////
Expand Down
37 changes: 37 additions & 0 deletions napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <functional>
#include <initializer_list>
#include <memory>
#include <mutex>
#include <string>
#include <vector>

Expand Down Expand Up @@ -2061,6 +2062,42 @@ namespace Napi {
std::unique_ptr<napi_threadsafe_function, Deleter> _tsfn;
Deleter _d;
};

template<class T>
class AsyncProgressWorker : public AsyncWorker {
public:
virtual ~AsyncProgressWorker();

class ExecutionProgress {
friend class AsyncProgressWorker;
public:
void Signal() const;
void Send(const T* data, size_t count) const;
private:
explicit ExecutionProgress(AsyncProgressWorker* that) : _that(that) {}
AsyncProgressWorker* const _that;
};

protected:
explicit AsyncProgressWorker(const Function& callback,
const char* resource_name,
const Object& resource);

virtual void Execute(const ExecutionProgress& progress) = 0;
virtual void OnProgress(const T* data, size_t count) = 0;

private:
static void WorkProgress_(Napi::Env env, Napi::Function jsCallback, void* data);

void Execute() override;
void Signal() const;
void SendProgress_(const T* data, size_t count);

std::mutex _mutex;
T* _asyncdata;
size_t _asyncsize;
ThreadSafeFunction _tsfn;
};
#endif

// Memory management.
Expand Down
80 changes: 80 additions & 0 deletions test/asyncprogressworker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#include "napi.h"

#include <chrono>
#include <condition_variable>
#include <mutex>
#include <thread>

#if (NAPI_VERSION > 3)

using namespace Napi;

namespace {

struct ProgressData {
size_t progress;
};

class TestWorker : public AsyncProgressWorker<ProgressData> {
public:
static void DoWork(const CallbackInfo& info) {
bool succeed = info[0].As<Boolean>();
Object resource = info[1].As<Object>();
Function cb = info[2].As<Function>();
Value data = info[3];

TestWorker* worker = new TestWorker(cb, "TestResource", resource);
worker->Receiver().Set("data", data);
worker->_succeed = succeed;
worker->Queue();
}

protected:
void Execute(const ExecutionProgress& progress) override {
std::unique_lock<std::mutex> lock(_cvm);
progress.Send(new ProgressData{0}, 1);
_cv.wait(lock);

progress.Send(new ProgressData{50}, 1);
_cv.wait(lock);
if (!_succeed) {
SetError("test error");
}
progress.Send(new ProgressData{75}, 1);
_cv.wait(lock);
}

void OnProgress(const ProgressData* data, size_t /* count */) override {
FunctionReference& callback = Callback();
Napi::Env env = Env();
if (!callback.IsEmpty()) {
Value err = env.Undefined();
Number progress = Number::New(env, data->progress);
callback.Call(Receiver().Value(), { err, progress });
}
_cv.notify_one();
}

std::vector<napi_value> GetResult(Napi::Env env) override {
Value err = env.Undefined();
Number progress = Number::New(env, 100);
return {err, progress};
}

private:
TestWorker(Function cb, const char* resource_name, const Object& resource)
: AsyncProgressWorker(cb, resource_name, resource) {}
std::condition_variable _cv;
std::mutex _cvm;
bool _succeed;
};

}

Object InitAsyncProgressWorker(Env env) {
Object exports = Object::New(env);
exports["doWork"] = Function::New(env, TestWorker::DoWork);
return exports;
}

#endif
38 changes: 38 additions & 0 deletions test/asyncprogressworker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
'use strict';
const buildType = process.config.target_defaults.default_configuration;
const common = require('./common')
const assert = require('assert');

test(require(`./build/${buildType}/binding.node`));
test(require(`./build/${buildType}/binding_noexcept.node`));

function test(binding) {
success(binding);
fail(binding);
return;
}

function success(binding) {
const expected = [0, 50, 75, 100];
const actual = [];
binding.asyncprogressworker.doWork(true, {}, common.mustCall((err, _progress) => {
if (err) {
assert.fail(err);
return;
}
actual.push(_progress);
if (actual.length === expected.length) {
assert.deepEqual(actual, expected);
}
}, expected.length));
}

function fail(binding) {
let err = undefined
binding.asyncprogressworker.doWork(false, {}, (_err) => {
err = _err;
});
process.on('exit', () => {
assert.throws(() => { throw err }, /test error/)
});
}
6 changes: 6 additions & 0 deletions test/binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ using namespace Napi;

Object InitArrayBuffer(Env env);
Object InitAsyncContext(Env env);
#if (NAPI_VERSION > 3)
Object InitAsyncProgressWorker(Env env);
#endif
Object InitAsyncWorker(Env env);
Object InitPersistentAsyncWorker(Env env);
Object InitBasicTypesArray(Env env);
Expand Down Expand Up @@ -49,6 +52,9 @@ Object InitThunkingManual(Env env);
Object Init(Env env, Object exports) {
exports.Set("arraybuffer", InitArrayBuffer(env));
exports.Set("asynccontext", InitAsyncContext(env));
#if (NAPI_VERSION > 3)
exports.Set("asyncprogressworker", InitAsyncProgressWorker(env));
#endif
exports.Set("asyncworker", InitAsyncWorker(env));
exports.Set("persistentasyncworker", InitPersistentAsyncWorker(env));
exports.Set("basic_types_array", InitBasicTypesArray(env));
Expand Down
1 change: 1 addition & 0 deletions test/binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
'sources': [
'arraybuffer.cc',
'asynccontext.cc',
'asyncprogressworker.cc',
'asyncworker.cc',
'asyncworker-persistent.cc',
'basic_types/array.cc',
Expand Down
2 changes: 2 additions & 0 deletions test/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ process.config.target_defaults.default_configuration =
let testModules = [
'arraybuffer',
'asynccontext',
'asyncprogressworker',
'asyncworker',
'asyncworker-nocallback',
'asyncworker-persistent',
Expand Down Expand Up @@ -66,6 +67,7 @@ if ((process.env.npm_config_NAPI_VERSION !== undefined) &&

if ((process.env.npm_config_NAPI_VERSION !== undefined) &&
(process.env.npm_config_NAPI_VERSION < 4)) {
testModules.splice(testModules.indexOf('asyncprogressworker'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function_ptr'), 1);
testModules.splice(testModules.indexOf('threadsafe_function/threadsafe_function'), 1);
}
Expand Down

0 comments on commit 16f7249

Please sign in to comment.