Skip to content

Commit

Permalink
Worker-based Async API with await/Promise support (or plain callbacks) (
Browse files Browse the repository at this point in the history
Eyevinn#6)

* fix lint errors in eslint config :)

* index.d.s: add /* eslint-disable @typescript-eslint/triple-slash-reference */

* improve eslint calling script

* add async-worker version of the SRT JS API with await/Promise support
+ potentially out-of-order RPC-back/result-dequeuing is possible
with a type of call-ID generator and callback-map that we prototyped
(but not used atm since the worker is only doing sync/blocking internally
with the current SRT lib binding).

* add various examples for async API ("classic" callbacks / Promise / await)

* add .eslintignore (should go with commit where we just call "eslint .")

* index.js: add missing semi

* async.js: fix method name litteral in epollUWait

* async.js: allow for accept method to use a timeout opt (defaults to false),
and a custom timeout value option (default to default timeout),
which can potentially be set differently than the general default timeout.
+ make the timeout value a static class property so that it can be
user-defined module-load wide. defaults to constant in module top scope.

* async.js: fix for a rejected promise, make sure we don't resolve anymore
+ add a custom timeout value argument to _createAsyncWorkPromise

* async.js: rm an experiment

* fix a lint error in example

* add types for async api and fix some details on binding API types

* export async API to index

* async.js: add some missing docs for private methods

* include async types in index
  • Loading branch information
tchakabam authored Aug 3, 2020
1 parent fdf396d commit 87884c6
Show file tree
Hide file tree
Showing 12 changed files with 517 additions and 35 deletions.
4 changes: 4 additions & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
build
deps
node_modules
tsc-lib
46 changes: 23 additions & 23 deletions .eslintrc.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
module.exports = {
root: true,
"env": {
"commonjs": true,
"es2020": true,
"node": true,
"jasmine": true
},
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/recommended",
"plugin:jasmine/recommended"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 11
},
"plugins": ["@typescript-eslint", "jasmine"],
"rules": {
"indent": ["error", 2],
"semi": "error",
"@typescript-eslint/no-var-requires": 0,
"no-constant-condition": 0
}
root: true,
"env": {
"commonjs": true,
"es2020": true,
"node": true,
"jasmine": true
},
"extends": [
"eslint:recommended",
"plugin:@typescript-eslint/recommended",
"plugin:jasmine/recommended"
],
"parser": "@typescript-eslint/parser",
"parserOptions": {
"ecmaVersion": 11
},
"plugins": ["@typescript-eslint", "jasmine"],
"rules": {
"indent": ["error", 2],
"semi": "error",
"@typescript-eslint/no-var-requires": 0,
"no-constant-condition": 0
}
};
25 changes: 25 additions & 0 deletions examples/async-srt-await.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
const { AsyncSRT } = require('../src/async.js');

const asyncSrt = new AsyncSRT();

(async function() {
const socket = await asyncSrt.createSocket(false);
console.log('createSocket() result:', socket);
let result = await asyncSrt.bind(socket, "0.0.0.0", 1234);
console.log('bind() result:', result);
result = await asyncSrt.listen(socket, 2);
console.log('listen() result:', result);

awaitConnections(socket);

})();

async function awaitConnections(socket) {
console.log('Awaiting incoming client connection ...');
const fd = await asyncSrt.accept(socket);
console.log('New incoming client fd:', fd);
}

setInterval(() => {
console.log('Doing other stuff in the meantime ... :)');
}, 1000);
32 changes: 32 additions & 0 deletions examples/async-srt-promises.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"use strict";

const { AsyncSRT } = require('../src/async.js');

const asyncSrt = new AsyncSRT();

let mySocket;

asyncSrt.createSocket(false)
.catch((err) => console.error(err))
.then((result) => {
console.log('createSocket:', result);
mySocket = result;
return result;
})
.then((socket) => asyncSrt.bind(socket, "0.0.0.0", 1234))
.then((result) => {
if (result !== 0) {
throw new Error('Failed bind');
}
console.log('Bind success');
return asyncSrt.listen(mySocket, 2);
})
.then((result) => {
if (!result) {
console.log("Listen success");
} else {
throw new Error('SRT listen error: ' + result);
}
});


25 changes: 25 additions & 0 deletions examples/async-srt.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"use strict";

const { AsyncSRT } = require('../src/async.js');

const asyncSrt = new AsyncSRT();

asyncSrt.createSocket(false, (result) => {
console.log('createSocket:', result);
const socket = result;
asyncSrt.bind(socket, "0.0.0.0", 1234, (result) => {
if (result !== 0) {
console.log('Failed bind');
} else {
console.log('Bind success');
asyncSrt.listen(socket, 2, (result) => {
if (!result) {
console.log("Listen success");
} else {
console.log(result);
}
});
}
});
});

2 changes: 2 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
/* eslint-disable @typescript-eslint/triple-slash-reference */
/// <reference path="./types/srt-api.d.ts" />
/// <reference path="./types/srt-stream.d.ts" />
/// <reference path="./types/srt-server.d.ts" />
/// <reference path="./types/srt-api-async.d.ts" />

export * from "srt";
12 changes: 7 additions & 5 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
const LIB = require('./build/Release/node_srt.node');
const { SRT } = require('./build/Release/node_srt.node');
const Server = require('./src/server.js');
const { SRTReadStream, SRTWriteStream } = require('./src/stream.js');
const { AsyncSRT } = require('./src/async');

module.exports = {
SRT: LIB.SRT,
Server: Server,
SRT,
Server,
SRTReadStream,
SRTWriteStream
}
SRTWriteStream,
AsyncSRT,
};
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
"rebuild": "node-gyp rebuild",
"clean": "node-gyp clean",
"test": "$(npm bin)/jasmine",
"lint": "eslint src examples types --ext .js --ext .ts",
"lint": "eslint . --ext .js --ext .ts",
"check-tsc": "./node_modules/.bin/tsc examples/srt.ts --outDir ./tsc-lib",
"postversion": "git push && git push --tags"
},
Expand Down
41 changes: 41 additions & 0 deletions src/async-worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
const {
Worker, isMainThread, parentPort, workerData
} = require('worker_threads');

const { SRT } = require('../build/Release/node_srt.node');

if (isMainThread) {
throw new Error("Worker module can not load on main thread");
}

(function run() {
const libSRT = new SRT();
parentPort.on('message', (data) => {
if (!data.method) {
throw new Error('Worker message needs `method` property');
}
/*
if (!data.workId) {
throw new Error('Worker message needs `workId` property');
}
*/
let result = libSRT[data.method].apply(libSRT, data.args);
// TODO: see if we can do this using SharedArrayBuffer for example,
// or just leveraging Transferable objects capabilities ... ?
// FIXME: Performance ... ?
if (result instanceof Buffer) {
const buf = Buffer.allocUnsafe(result.length);
result.copy(buf);
result = buf;
}
parentPort.postMessage({
// workId: data.workId,
timestamp: data.timestamp,
result
});
});
})();




Loading

0 comments on commit 87884c6

Please sign in to comment.