Skip to content

Commit

Permalink
Don't trigger events when module (environment) is terminating.
Browse files Browse the repository at this point in the history
  • Loading branch information
rolftimmermans committed Nov 4, 2019
1 parent 99712aa commit 61da93b
Show file tree
Hide file tree
Showing 7 changed files with 117 additions and 68 deletions.
6 changes: 6 additions & 0 deletions src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,16 @@ class Module {
public:
explicit Module(Napi::Object exports);

~Module() {
Terminating = true;
}

inline class Global& Global() {
return *global;
}

bool Terminating = false;

/* The order of properties defines their destruction in reverse order and is
very important to ensure a clean process exit. During the destruction of
other objects buffers might be released, we must delete trash last. */
Expand Down
5 changes: 3 additions & 2 deletions src/observer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,9 @@ void Observer::Close() {

/* Stop all polling and release event handlers. Callling this after
setting socket to null causes a pending receive promise to be
resolved with undefined. */
poller.Close();
resolved with undefined. If the module is terminating, first cancel
all callbacks (they won't work anymore). */
poller.Close(module.Terminating);
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/poller.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class Poller {

/* Safely close and release all handles. This can be called before
destruction to release resources early. */
inline void Close() {
inline void Close(bool cancel = false) {
/* Cancels watched events, don't trigger. */
if (cancel) events = 0;

/* Trigger all watched events manually, which causes any pending
operation to succeed or fail immediately. */
if (events) Trigger(events);
Expand Down
11 changes: 5 additions & 6 deletions src/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,9 @@ void Socket::Close() {
endpoints = 0;
}

/* Stop all polling and release event handlers. */
poller.Close();
/* Stop all polling and release event handlers. If the module is
terminating, first cancel all callbacks (they won't work anymore). */
poller.Close(module.Terminating);

/* Close succeeds unless socket is invalid. */
auto err = zmq_close(socket);
Expand Down Expand Up @@ -343,8 +344,7 @@ Napi::Value Socket::Bind(const Napi::CallbackInfo& info) {

state = Socket::State::Blocked;
auto res = Napi::Promise::Deferred::New(Env());
auto run_ctx =
std::make_shared<AddressContext>(info[0].As<Napi::String>().Utf8Value());
auto run_ctx = std::make_shared<AddressContext>(info[0].As<Napi::String>());

auto status = UvQueue(Env(),
[=]() {
Expand Down Expand Up @@ -395,8 +395,7 @@ Napi::Value Socket::Unbind(const Napi::CallbackInfo& info) {

state = Socket::State::Blocked;
auto res = Napi::Promise::Deferred::New(Env());
auto run_ctx =
std::make_shared<AddressContext>(info[0].As<Napi::String>().Utf8Value());
auto run_ctx = std::make_shared<AddressContext>(info[0].As<Napi::String>());

auto status = UvQueue(Env(),
[=]() {
Expand Down
82 changes: 24 additions & 58 deletions test/unit/context-process-exit-test.ts
Original file line number Diff line number Diff line change
@@ -1,59 +1,62 @@
/* tslint:disable: no-unused-expression */
import * as zmq from "../../src"

import {assert} from "chai"
import {spawn} from "child_process"
import {createProcess} from "./helpers"

/* This file is in JavaScript instead of TypeScript because most code is
being evaluated with toString() and executed in a sub-process. */
describe("context process exit", function() {
describe("with default context", function() {
it("should occur when sockets are closed", async function() {
this.slow(200)
await ensureExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const socket1 = new zmq.Dealer
socket1.close()
const socket2 = new zmq.Router
socket2.close()
})

assert.equal(code, 0)
})

it("should occur when sockets are not closed", async function() {
this.slow(200)
await ensureExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const socket1 = new zmq.Dealer
const socket2 = new zmq.Router
})

assert.equal(code, 0)
})

it("should not occur when sockets are open and polling", async function() {
this.slow(750)
await ensureNoExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const socket1 = new zmq.Dealer
socket1.connect("inproc://foo")
socket1.receive()
})

assert.equal(code, -1)
})
})

describe("with custom context", function() {
it("should occur when sockets are closed", async function() {
this.slow(200)
await ensureExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const context = new zmq.Context
const socket1 = new zmq.Dealer({context})
socket1.close()
const socket2 = new zmq.Router({context})
socket2.close()
})

assert.equal(code, 0)
})

it("should occur when sockets are closed and context is gced", async function() {
this.slow(200)
await ensureExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
function run() {
const context = new zmq.Context
const socket1 = new zmq.Dealer({context})
Expand All @@ -65,68 +68,31 @@ describe("context process exit", function() {
run()
global.gc()
})

assert.equal(code, 0)
})

it("should occur when sockets are not closed", async function() {
this.slow(200)
await ensureExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const context = new zmq.Context
const socket1 = new zmq.Dealer({context})
const socket2 = new zmq.Router({context})
})

assert.equal(code, 0)
})

it("should not occur when sockets are open and polling", async function() {
this.slow(750)
await ensureNoExit(function() {
const zmq = require(".")
const code = await createProcess(() => {
const context = new zmq.Context
const socket1 = new zmq.Dealer({context})
socket1.connect("inproc://foo")
socket1.receive()
})
})
})
})

async function ensureExit(fn: () => void): Promise<void> {
return new Promise((resolve) => {
const child = spawn(process.argv[0], ["--expose_gc"])
child.stdin.write(`(${fn})()`)
child.stdin.end()

child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))

child.on("close", (code: number) => {
assert.equal(code, 0)
resolve()
assert.equal(code, -1)
})

setTimeout(() => {
resolve()
child.kill()
}, 2000)
})
}

async function ensureNoExit(fn: () => void): Promise<void> {
return new Promise((resolve, reject) => {
const child = spawn(process.argv[0], ["--expose_gc"])
child.stdin.write(`(${fn})()`)
child.stdin.end()

child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))

child.on("close", (code: number) => {
reject(new Error(`Exit with code ${code}`))
})

setTimeout(() => {
resolve()
child.kill()
}, 500)
})
}
})
34 changes: 33 additions & 1 deletion test/unit/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import * as path from "path"
import * as semver from "semver"

import {spawn} from "child_process"
import {Worker} from "worker_threads"

import * as zmq from "../../src"
Expand Down Expand Up @@ -51,7 +53,7 @@ export function createWorker<T, D extends {}>(
parentPort.postMessage(msg)
}
run().then(global.gc)
run()
`

const worker = new Worker(src, {
Expand All @@ -71,3 +73,33 @@ export function createWorker<T, D extends {}>(
})
})
}

export function createProcess(fn: () => void): Promise<number> {
const src = `
const zmq = require(${JSON.stringify(path.resolve(__dirname, "../.."))})
const fn = ${fn.toString()}
fn()
`

const child = spawn(process.argv[0], ["--expose_gc"])
child.stdin.write(src)
child.stdin.end()

child.stdout.on("data", (data: Buffer) => console.log(data.toString()))
child.stderr.on("data", (data: Buffer) => console.error(data.toString()))

return new Promise((resolve, reject) => {
child.on("close", (code: number, signal: string) => {
if (signal != null) {
reject(new Error(`Child exited with ${signal}`))
} else {
resolve(code)
}
})

setTimeout(() => {
resolve(-1)
child.kill()
}, 1000)
})
}
42 changes: 42 additions & 0 deletions test/unit/socket-process-exit-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import * as zmq from "../../src"

import {assert} from "chai"
import {createProcess, uniqAddress} from "./helpers"

describe("socket process exit", function() {
it.skip("should occur cleanly when sending in exit hook", async function() {
this.slow(200)
const code = await createProcess(async () => {
const sockA = new zmq.Pair
const sockB = new zmq.Pair
await sockA.bind("inproc://test-1")
sockB.connect("inproc://test-1")

process.on("exit", () => {
console.log("hook")
sockB.receive()
sockA.send("foo")
})
})

assert.equal(code, 0)
})

it("should occur cleanly when reading events", async function() {
this.slow(200)
const code = await createProcess(() => {
const sock = new zmq.Dealer

async function readEvents() {
const events = []
for await (const event of sock.events) {
events.push(event)
}
}

readEvents()
})

assert.equal(code, 0)
})
})

0 comments on commit 61da93b

Please sign in to comment.