Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: fix implementation of AsyncProgressWorker::Signal #1216

Closed
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions napi-inl.h
Original file line number Diff line number Diff line change
Expand Up @@ -5950,7 +5950,8 @@ inline AsyncProgressWorker<T>::AsyncProgressWorker(const Object& receiver,
const Object& resource)
: AsyncProgressWorkerBase(receiver, callback, resource_name, resource),
_asyncdata(nullptr),
_asyncsize(0) {}
_asyncsize(0),
_signaled(false) {}

#if NAPI_VERSION > 4
template <class T>
Expand Down Expand Up @@ -5990,12 +5991,15 @@ template <class T>
inline void AsyncProgressWorker<T>::OnWorkProgress(void*) {
T* data;
size_t size;
bool signaled;
{
std::lock_guard<std::mutex> lock(this->_mutex);
data = this->_asyncdata;
size = this->_asyncsize;
signaled = this->_signaled;
this->_asyncdata = nullptr;
this->_asyncsize = 0;
this->_signaled = false;
}

/**
Expand All @@ -6005,7 +6009,7 @@ inline void AsyncProgressWorker<T>::OnWorkProgress(void*) {
* the deferring the signal of uv_async_t is been sent again, i.e. potential
* not coalesced two calls of the TSFN callback.
*/
if (data == nullptr) {
if (data == nullptr && !signaled) {
return;
}

Expand All @@ -6024,20 +6028,25 @@ inline void AsyncProgressWorker<T>::SendProgress_(const T* data, size_t count) {
old_data = _asyncdata;
_asyncdata = new_data;
_asyncsize = count;
_signaled = false;
}
this->NonBlockingCall(nullptr);

delete[] old_data;
}

template <class T>
inline void AsyncProgressWorker<T>::Signal() const {
inline void AsyncProgressWorker<T>::Signal() {
{
std::lock_guard<std::mutex> lock(this->_mutex);
_signaled = true;
}
this->NonBlockingCall(static_cast<T*>(nullptr));
}

template <class T>
inline void AsyncProgressWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
this->_worker->Signal();
}

template <class T>
Expand Down Expand Up @@ -6140,7 +6149,7 @@ inline void AsyncProgressQueueWorker<T>::SendProgress_(const T* data,

template <class T>
inline void AsyncProgressQueueWorker<T>::Signal() const {
this->NonBlockingCall(nullptr);
this->SendProgress_(static_cast<T*>(nullptr), 0);
}

template <class T>
Expand All @@ -6152,7 +6161,7 @@ inline void AsyncProgressQueueWorker<T>::OnWorkComplete(Napi::Env env,

template <class T>
inline void AsyncProgressQueueWorker<T>::ExecutionProgress::Signal() const {
_worker->Signal();
_worker->SendProgress_(static_cast<T*>(nullptr), 0);
}

template <class T>
Expand Down
3 changes: 2 additions & 1 deletion napi.h
Original file line number Diff line number Diff line change
Expand Up @@ -3007,12 +3007,13 @@ class AsyncProgressWorker : public AsyncProgressWorkerBase<void> {

private:
void Execute() override;
void Signal() const;
void Signal();
void SendProgress_(const T* data, size_t count);

std::mutex _mutex;
T* _asyncdata;
size_t _asyncsize;
bool _signaled;
};

template <class T>
Expand Down
16 changes: 13 additions & 3 deletions test/async_progress_queue_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {

if (_times < 0) {
SetError("test error");
} else {
progress.Signal();
}
ProgressData data{0};
for (int32_t idx = 0; idx < _times; idx++) {
Expand All @@ -49,11 +51,18 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {
}
}

void OnProgress(const ProgressData* data, size_t /* count */) override {
void OnProgress(const ProgressData* data, size_t count) override {
Napi::Env env = Env();
_test_case_count++;
if (!_js_progress_cb.IsEmpty()) {
Number progress = Number::New(env, data->progress);
_js_progress_cb.Call(Receiver().Value(), {progress});
if (_test_case_count == 1) {
if (count != 0) {
SetError("expect 0 count of data on 1st call");
}
} else {
Number progress = Number::New(env, data->progress);
_js_progress_cb.Call(Receiver().Value(), {progress});
}
}
}

Expand All @@ -68,6 +77,7 @@ class TestWorker : public AsyncProgressQueueWorker<ProgressData> {
}

int32_t _times;
size_t _test_case_count = 0;
FunctionReference _js_progress_cb;
};

Expand Down
74 changes: 65 additions & 9 deletions test/async_progress_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,17 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {

protected:
void Execute(const ExecutionProgress& progress) override {
std::unique_lock<std::mutex> lock(_cvm);
// Testing a nullptr send is acceptable.
progress.Send(nullptr, 0);
_cv.wait(lock);
{
std::unique_lock<std::mutex> lock(_cvm);
// Testing a nullptr send is acceptable.
progress.Send(nullptr, 0);
_cv.wait(lock, [this] { return _test_case_count == 1; });
}
{
std::unique_lock<std::mutex> lock(_cvm);
progress.Signal();
_cv.wait(lock, [this] { return _test_case_count == 2; });
}
// Testing busy looping on send doesn't trigger unexpected empty data
// OnProgress call.
for (size_t i = 0; i < 1000000; i++) {
Expand All @@ -92,16 +99,21 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {

void OnProgress(const ProgressData* /* data */, size_t count) override {
Napi::Env env = Env();
_test_case_count++;
{
std::lock_guard<std::mutex> lock(_cvm);
_test_case_count++;
}
bool error = false;
Napi::String reason = Napi::String::New(env, "No error");
if (_test_case_count == 1 && count != 0) {
if (_test_case_count <= 2 && count != 0) {
error = true;
reason = Napi::String::New(env, "expect 0 count of data on 1st call");
reason =
Napi::String::New(env, "expect 0 count of data on 1st and 2nd call");
}
if (_test_case_count > 1 && count != 1) {
if (_test_case_count > 2 && count != 1) {
error = true;
reason = Napi::String::New(env, "expect 1 count of data on non-1st call");
reason = Napi::String::New(
env, "expect 1 count of data on non-1st and non-2nd call");
}
_progress.MakeCallback(Receiver().Value(),
{Napi::Boolean::New(env, error), reason});
Expand All @@ -122,12 +134,56 @@ class MalignWorker : public AsyncProgressWorker<ProgressData> {
std::mutex _cvm;
FunctionReference _progress;
};

// Calling a Signal after a SendProgress should not clear progress data
class SignalAfterProgressTestWorker : public AsyncProgressWorker<ProgressData> {
public:
static void DoWork(const CallbackInfo& info) {
Function cb = info[0].As<Function>();
Function progress = info[1].As<Function>();

SignalAfterProgressTestWorker* worker = new SignalAfterProgressTestWorker(
cb, progress, "TestResource", Object::New(info.Env()));
worker->Queue();
}

protected:
void Execute(const ExecutionProgress& progress) override {
ProgressData data{0};
progress.Send(&data, 1);
progress.Signal();
}

void OnProgress(const ProgressData* /* data */, size_t count) override {
Napi::Env env = Env();
bool error = false;
Napi::String reason = Napi::String::New(env, "No error");
if (count != 1) {
error = true;
reason = Napi::String::New(env, "expect 1 count of data");
}
_progress.MakeCallback(Receiver().Value(),
{Napi::Boolean::New(env, error), reason});
}

private:
SignalAfterProgressTestWorker(Function cb,
Function progress,
const char* resource_name,
const Object& resource)
: AsyncProgressWorker(cb, resource_name, resource) {
_progress.Reset(progress, 1);
}
FunctionReference _progress;
};
} // namespace

Object InitAsyncProgressWorker(Env env) {
Object exports = Object::New(env);
exports["doWork"] = Function::New(env, TestWorker::DoWork);
exports["doMalignTest"] = Function::New(env, MalignWorker::DoWork);
exports["doSignalAfterProgressTest"] =
Function::New(env, SignalAfterProgressTestWorker::DoWork);
return exports;
}

Expand Down
13 changes: 9 additions & 4 deletions test/async_progress_worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ module.exports = common.runTest(test);
async function test ({ asyncprogressworker }) {
await success(asyncprogressworker);
await fail(asyncprogressworker);
await malignTest(asyncprogressworker);
await signalTest(asyncprogressworker.doMalignTest);
await signalTest(asyncprogressworker.doSignalAfterProgressTest);
}

function success (binding) {
Expand Down Expand Up @@ -44,17 +45,21 @@ function fail (binding) {
});
}

function malignTest (binding) {
function signalTest (bindingFunction) {
return new Promise((resolve, reject) => {
binding.doMalignTest(
bindingFunction(
common.mustCall((err) => {
if (err) {
return reject(err);
}
resolve();
}),
common.mustCallAtLeast((error, reason) => {
assert(!error, reason);
try {
assert(!error, reason);
} catch (e) {
reject(e);
}
}, 1)
);
});
Expand Down