Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Worker thread and Node 13 support #350

Merged
merged 8 commits into from
Nov 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ jobs:
# Test recent Node versions.
- os: linux
node_js: "12"
env: ZMQ_DRAFT=true INCLUDE_COMPAT_TESTS=true
# Skip GC tests due to https://github.com/node-ffi-napi/weak-napi/issues/16
env: ZMQ_DRAFT=true SKIP_GC_TESTS=true INCLUDE_COMPAT_TESTS=true

- os: linux
node_js: "13"
env: ZMQ_DRAFT=true
# Skip GC tests due to https://github.com/node-ffi-napi/weak-napi/issues/16
env: ZMQ_DRAFT=true SKIP_GC_TESTS=true

## PREBUILD STAGE

Expand Down Expand Up @@ -110,9 +112,6 @@ jobs:
script: script/ci/package.sh

fast_finish: true
allow_failures:
- os: linux
node_js: "13"

stages:
- name: test
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,9 @@ pub.bind("tcp://*:3456", err => {

In order to develop and test the library, you'll need the following:

* A working C/C++ compiler toolchain with make
* Python 2.7
* Node.js 10+
* A working C/C++ compiler toolchain with make
* Python 2.7 (or Python 3 with Node 12.13+)
* CMake 2.8+
* curl
* clang-format is strongly recommended
Expand Down
2 changes: 1 addition & 1 deletion binding.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
'target_name': 'zeromq',
'dependencies': ['libzmq'],
'sources': [
'src/binding.cc',
'src/context.cc',
'src/incoming_msg.cc',
'src/module.cc',
'src/observer.cc',
'src/outgoing_msg.cc',
'src/proxy.cc',
Expand Down
3 changes: 2 additions & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
},
"scripts": {
"majordomo": "ts-node majordomo",
"queue": "ts-node queue"
"queue": "ts-node queue",
"threaded-worker": "ts-node threaded-worker"
}
}
131 changes: 131 additions & 0 deletions examples/threaded-worker/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Distributed processing with worker threads

## Running this example

To run this example, install the example project depedencies and run the threaded worker example script with `yarn`:

```
> yarn install
> yarn threaded-worker
```

## Expected behaviour

The example will start worker threads which each encodes a character with the caesar cipher 200 million + 1 times. The output will be similar to this:

```
starting 8 worker threads
---
sending input 'Hello world!'
received work 'H' at 0
received work ' ' at 5
received work 'w' at 6
received work 'o' at 4
received work 'e' at 1
received work 'o' at 7
received work 'l' at 3
received work 'l' at 2
finished work ' ' -> ' ' at 5
finished work 'o' -> 'b' at 7
finished work 'o' -> 'b' at 4
finished work 'w' -> 'j' at 6
finished work 'l' -> 'y' at 3
received work '!' at 11
finished work '!' -> '!' at 11
finished work 'l' -> 'y' at 2
received work 'd' at 10
finished work 'd' -> 'q' at 10
finished work 'H' -> 'U' at 0
received work 'r' at 8
finished work 'r' -> 'e' at 8
received output 'Hello world!' -> 'Uryyb jbeyq!' in 3s
---
sending input 'Would you like more sockets?'
received work 'u' at 2
received work 'W' at 0
received work 'l' at 3
received work 'o' at 1
received work 'o' at 7
received work 'd' at 4
finished work 'e' -> 'r' at 1
received work 'y' at 6
received work 'l' at 9
finished work 'l' -> 'y' at 9
received work ' ' at 5
finished work ' ' -> ' ' at 5
received work 'e' at 13
finished work 'e' -> 'r' at 13
received work 'o' at 21
finished work 'o' -> 'b' at 21
finished work 'y' -> 'l' at 6
received work ' ' at 14
finished work ' ' -> ' ' at 14
received work 'c' at 22
finished work 'c' -> 'p' at 22
finished work 'l' -> 'y' at 3
received work 'i' at 11
finished work 'i' -> 'v' at 11
received work ' ' at 19
finished work ' ' -> ' ' at 19
received work '?' at 27
finished work '?' -> '?' at 27
finished work 'd' -> 'q' at 4
received work 'k' at 12
finished work 'k' -> 'x' at 12
received work 's' at 20
finished work 's' -> 'f' at 20
finished work 'o' -> 'b' at 7
received work 'm' at 15
finished work 'm' -> 'z' at 15
received work 'k' at 23
finished work 'k' -> 'x' at 23
finished work 'o' -> 'b' at 1
received work ' ' at 9
finished work ' ' -> ' ' at 9
received work 'r' at 17
finished work 'r' -> 'e' at 17
received work 't' at 25
finished work 't' -> 'g' at 25
finished work 'W' -> 'J' at 0
received work 'u' at 8
finished work 'u' -> 'h' at 8
received work 'o' at 16
finished work 'o' -> 'b' at 16
received work 'e' at 24
finished work 'e' -> 'r' at 24
received output 'Would you like more sockets?' -> 'Jbhyq lbh yvxr zber fbpxrgf?' in 5s
---
sending input 'Yes please.'
received work 'Y' at 0
received work 'l' at 5
finished work 'u' -> 'h' at 2
received work 'p' at 4
received work 'e' at 1
received work 'a' at 7
received work ' ' at 3
received work 's' at 2
finished work ' ' -> ' ' at 3
finished work 'a' -> 'n' at 7
received work 'l' at 10
finished work 'l' -> 'y' at 10
received work 'e' at 18
finished work 'e' -> 'r' at 18
received work 's' at 26
finished work 's' -> 'f' at 26
received work 'e' at 6
finished work 'e' -> 'r' at 6
finished work 'p' -> 'c' at 4
finished work 'l' -> 'y' at 5
finished work 's' -> 'f' at 2
received work '.' at 10
finished work '.' -> '.' at 10
finished work 'Y' -> 'L' at 0
received work 's' at 8
finished work 's' -> 'f' at 8
received output 'Yes please.' -> 'Lrf cyrnfr.' in 2s
---
finished work 'e' -> 'r' at 1
received work 'e' at 9
finished work 'e' -> 'r' at 9
all workers stopped
```
26 changes: 26 additions & 0 deletions examples/threaded-worker/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/* tslint:disable: no-console */
import {Processor} from "./processor"

async function main() {
const processor = new Processor()
await transform(processor, "Hello world!")
await transform(processor, "Would you like more sockets?")
await transform(processor, "Yes please.")
await processor.stop()
}

async function transform(processor: Processor, input: string) {
console.log(`sending input '${input}'`)

const start = process.hrtime()
const output = await processor.process(input)
const end = process.hrtime(start)

console.log(`received output '${input}' -> '${output}' in ${end[0]}s`)
console.log(`---`)
}

main().catch((err) => {
console.error(err)
process.exit(1)
})
61 changes: 61 additions & 0 deletions examples/threaded-worker/processor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/* tslint:disable: no-console */
import {cpus} from "os"
import {Publisher, Pull, Push} from "zeromq"

import {ThreadedWorker} from "./threaded-worker"

export class Processor {
threads: number

input = new Push()
output = new Pull()
signal = new Publisher()

init: Promise<any>
exit: Promise<any>

constructor(threads: number = cpus().length) {
console.log(`starting ${threads} worker threads`)
console.log(`---`)

this.threads = threads
this.init = Promise.all([
this.input.bind("inproc://input"),
this.output.bind("inproc://output"),
this.signal.bind("inproc://signal"),
new Promise((resolve) => setTimeout(resolve, 100)),
])

this.exit = Promise.all([
ThreadedWorker.spawn(this.threads),
])
}

async process(str: string): Promise<string> {
await this.init

const input = str.split("")
for (const req of input.entries()) {
await this.input.send(req.map((pt) => pt.toString()))
}

const output: string[] = Array.from({length: input.length})
for await (const [pos, res] of this.output) {
output[parseInt(pos.toString(), 10)] = res.toString()
if (output.every((el) => el !== undefined)) break
}

return output.join("")
}

async stop() {
await Promise.all([
this.signal.send("stop"),
this.input.unbind("inproc://input"),
this.output.unbind("inproc://output"),
this.signal.unbind("inproc://signal"),
])

await this.exit
}
}
92 changes: 92 additions & 0 deletions examples/threaded-worker/threaded-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/* tslint:disable: no-console */
import {Worker} from "worker_threads"
import * as zmq from "zeromq"

export class ThreadedWorker {
static async spawn(threads: number) {
const workers = Array.from({length: threads}).map(() => {
return new Promise((resolve, reject) => {
const src = `
const zmq = require("zeromq")
${ThreadedWorker.toString()}
new ThreadedWorker().run()
`

new Worker(src, {eval: true}).on("exit", (code) => {
if (code === 0) {
resolve()
} else {
reject(new Error(`Worker stopped with exit code ${code}`))
}
})
})
})

await Promise.all(workers)
console.log(`all workers stopped`)
}

/* Queue only 1 incoming message. */
input = new zmq.Pull({receiveHighWaterMark: 1})
output = new zmq.Push()
signal = new zmq.Subscriber()

shift = 13
maxDelay = 2000 /* Average of 1s. */

constructor() {
this.input.connect("inproc://input")
this.output.connect("inproc://output")

this.signal.connect("inproc://signal")
this.signal.subscribe()

const listen = async () => {
for await (const [sig] of this.signal) {
if (sig.toString() === "stop") this.stop()
}
}

listen()
}

async stop() {
this.input.close()
this.output.close()
this.signal.close()
}

/* Loop over input and produce output. */
async run() {
for await (const [pos, req] of this.input) {
if (req.length !== 1) {
console.log(`skipping invalid '${req}'`)
continue
}

console.log(`received work '${req}' at ${pos}`)

const res = await this.work(req.toString())
await this.output.send([pos, res])

console.log(`finished work '${req}' -> '${res}' at ${pos}`)
}
}

/* Do the actual Caesar shift. */
async work(req: string): Promise<string> {
// await new Promise((resolve) => setTimeout(resolve, Math.random() * this.maxDelay))

let char = req.charCodeAt(0)

for (let i = 0; i < 200000001; i++) {
if (char >= 65 && char <= 90) {
char = (char - 65 + this.shift) % 26 + 65
} else if (char >= 97 && char <= 122) {
char = (char - 97 + this.shift) % 26 + 97
}
}

return String.fromCharCode(char)
}
}
4 changes: 2 additions & 2 deletions examples/tsconfig.json
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
{
"include": ["*"],
"compilerOptions": {
"target": "es2017",
"target": "es2018",
"module": "commonjs",
"lib": ["es2017", "esnext.asynciterable"],
"lib": ["es2018", "esnext.asynciterable"],
"strict": true,
}
}
Loading