diff --git a/src/module.h b/src/module.h index 542e1228..bedc87a9 100644 --- a/src/module.h +++ b/src/module.h @@ -48,10 +48,16 @@ class Module { public: explicit Module(Napi::Object exports); + ~Module() { + Terminating = true; + } + inline class Global& Global() { return *global; } + bool Terminating = false; + /* The order of properties defines their destruction in reverse order and is very important to ensure a clean process exit. During the destruction of other objects buffers might be released, we must delete trash last. */ diff --git a/src/observer.cc b/src/observer.cc index 64955949..532fb1ad 100644 --- a/src/observer.cc +++ b/src/observer.cc @@ -178,8 +178,9 @@ void Observer::Close() { /* Stop all polling and release event handlers. Callling this after setting socket to null causes a pending receive promise to be - resolved with undefined. */ - poller.Close(); + resolved with undefined. If the module is terminating, first cancel + all callbacks (they won't work anymore). */ + poller.Close(module.Terminating); } } diff --git a/src/poller.h b/src/poller.h index e51b962e..34786ee3 100644 --- a/src/poller.h +++ b/src/poller.h @@ -65,7 +65,10 @@ class Poller { /* Safely close and release all handles. This can be called before destruction to release resources early. */ - inline void Close() { + inline void Close(bool cancel = false) { + /* Cancels watched events, don't trigger. */ + if (cancel) events = 0; + /* Trigger all watched events manually, which causes any pending operation to succeed or fail immediately. */ if (events) Trigger(events); diff --git a/src/socket.cc b/src/socket.cc index b227ebcc..f4716c85 100644 --- a/src/socket.cc +++ b/src/socket.cc @@ -252,8 +252,9 @@ void Socket::Close() { endpoints = 0; } - /* Stop all polling and release event handlers. */ - poller.Close(); + /* Stop all polling and release event handlers. If the module is + terminating, first cancel all callbacks (they won't work anymore). */ + poller.Close(module.Terminating); /* Close succeeds unless socket is invalid. */ auto err = zmq_close(socket); @@ -343,8 +344,7 @@ Napi::Value Socket::Bind(const Napi::CallbackInfo& info) { state = Socket::State::Blocked; auto res = Napi::Promise::Deferred::New(Env()); - auto run_ctx = - std::make_shared(info[0].As().Utf8Value()); + auto run_ctx = std::make_shared(info[0].As()); auto status = UvQueue(Env(), [=]() { @@ -395,8 +395,7 @@ Napi::Value Socket::Unbind(const Napi::CallbackInfo& info) { state = Socket::State::Blocked; auto res = Napi::Promise::Deferred::New(Env()); - auto run_ctx = - std::make_shared(info[0].As().Utf8Value()); + auto run_ctx = std::make_shared(info[0].As()); auto status = UvQueue(Env(), [=]() { diff --git a/test/unit/context-process-exit-test.ts b/test/unit/context-process-exit-test.ts index 52a99f74..9e1b787c 100644 --- a/test/unit/context-process-exit-test.ts +++ b/test/unit/context-process-exit-test.ts @@ -1,59 +1,62 @@ /* tslint:disable: no-unused-expression */ +import * as zmq from "../../src" + import {assert} from "chai" -import {spawn} from "child_process" +import {createProcess} from "./helpers" -/* This file is in JavaScript instead of TypeScript because most code is - being evaluated with toString() and executed in a sub-process. */ describe("context process exit", function() { describe("with default context", function() { it("should occur when sockets are closed", async function() { this.slow(200) - await ensureExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const socket1 = new zmq.Dealer socket1.close() const socket2 = new zmq.Router socket2.close() }) + + assert.equal(code, 0) }) it("should occur when sockets are not closed", async function() { this.slow(200) - await ensureExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const socket1 = new zmq.Dealer const socket2 = new zmq.Router }) + + assert.equal(code, 0) }) it("should not occur when sockets are open and polling", async function() { this.slow(750) - await ensureNoExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const socket1 = new zmq.Dealer socket1.connect("inproc://foo") socket1.receive() }) + + assert.equal(code, -1) }) }) describe("with custom context", function() { it("should occur when sockets are closed", async function() { this.slow(200) - await ensureExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const context = new zmq.Context const socket1 = new zmq.Dealer({context}) socket1.close() const socket2 = new zmq.Router({context}) socket2.close() }) + + assert.equal(code, 0) }) it("should occur when sockets are closed and context is gced", async function() { this.slow(200) - await ensureExit(function() { - const zmq = require(".") + const code = await createProcess(() => { function run() { const context = new zmq.Context const socket1 = new zmq.Dealer({context}) @@ -65,68 +68,31 @@ describe("context process exit", function() { run() global.gc() }) + + assert.equal(code, 0) }) it("should occur when sockets are not closed", async function() { this.slow(200) - await ensureExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const context = new zmq.Context const socket1 = new zmq.Dealer({context}) const socket2 = new zmq.Router({context}) }) + + assert.equal(code, 0) }) it("should not occur when sockets are open and polling", async function() { this.slow(750) - await ensureNoExit(function() { - const zmq = require(".") + const code = await createProcess(() => { const context = new zmq.Context const socket1 = new zmq.Dealer({context}) socket1.connect("inproc://foo") socket1.receive() }) - }) - }) -}) - -async function ensureExit(fn: () => void): Promise { - return new Promise((resolve) => { - const child = spawn(process.argv[0], ["--expose_gc"]) - child.stdin.write(`(${fn})()`) - child.stdin.end() - - child.stdout.on("data", (data: Buffer) => console.log(data.toString())) - child.stderr.on("data", (data: Buffer) => console.error(data.toString())) - child.on("close", (code: number) => { - assert.equal(code, 0) - resolve() + assert.equal(code, -1) }) - - setTimeout(() => { - resolve() - child.kill() - }, 2000) }) -} - -async function ensureNoExit(fn: () => void): Promise { - return new Promise((resolve, reject) => { - const child = spawn(process.argv[0], ["--expose_gc"]) - child.stdin.write(`(${fn})()`) - child.stdin.end() - - child.stdout.on("data", (data: Buffer) => console.log(data.toString())) - child.stderr.on("data", (data: Buffer) => console.error(data.toString())) - - child.on("close", (code: number) => { - reject(new Error(`Exit with code ${code}`)) - }) - - setTimeout(() => { - resolve() - child.kill() - }, 500) - }) -} +}) diff --git a/test/unit/helpers.ts b/test/unit/helpers.ts index ada6961c..707b385f 100644 --- a/test/unit/helpers.ts +++ b/test/unit/helpers.ts @@ -1,5 +1,7 @@ import * as path from "path" import * as semver from "semver" + +import {spawn} from "child_process" import {Worker} from "worker_threads" import * as zmq from "../../src" @@ -51,7 +53,7 @@ export function createWorker( parentPort.postMessage(msg) } - run().then(global.gc) + run() ` const worker = new Worker(src, { @@ -71,3 +73,33 @@ export function createWorker( }) }) } + +export function createProcess(fn: () => void): Promise { + const src = ` + const zmq = require(${JSON.stringify(path.resolve(__dirname, "../.."))}) + const fn = ${fn.toString()} + fn() + ` + + const child = spawn(process.argv[0], ["--expose_gc"]) + child.stdin.write(src) + child.stdin.end() + + child.stdout.on("data", (data: Buffer) => console.log(data.toString())) + child.stderr.on("data", (data: Buffer) => console.error(data.toString())) + + return new Promise((resolve, reject) => { + child.on("close", (code: number, signal: string) => { + if (signal != null) { + reject(new Error(`Child exited with ${signal}`)) + } else { + resolve(code) + } + }) + + setTimeout(() => { + resolve(-1) + child.kill() + }, 1000) + }) +} diff --git a/test/unit/socket-process-exit-test.ts b/test/unit/socket-process-exit-test.ts new file mode 100644 index 00000000..67557661 --- /dev/null +++ b/test/unit/socket-process-exit-test.ts @@ -0,0 +1,42 @@ +import * as zmq from "../../src" + +import {assert} from "chai" +import {createProcess, uniqAddress} from "./helpers" + +describe("socket process exit", function() { + it.skip("should occur cleanly when sending in exit hook", async function() { + this.slow(200) + const code = await createProcess(async () => { + const sockA = new zmq.Pair + const sockB = new zmq.Pair + await sockA.bind("inproc://test-1") + sockB.connect("inproc://test-1") + + process.on("exit", () => { + console.log("hook") + sockB.receive() + sockA.send("foo") + }) + }) + + assert.equal(code, 0) + }) + + it("should occur cleanly when reading events", async function() { + this.slow(200) + const code = await createProcess(() => { + const sock = new zmq.Dealer + + async function readEvents() { + const events = [] + for await (const event of sock.events) { + events.push(event) + } + } + + readEvents() + }) + + assert.equal(code, 0) + }) +})