Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Web Locks API initial implementation #417

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
dist
coverage
.nyc_output
locks.js
197 changes: 197 additions & 0 deletions lib/locks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
'use strict';

const { EventEmitter } = require('events');
const { Worker, isMainThread, parentPort } = require('worker_threads');

const threads = new Set();

const LOCKED = 0;
const UNLOCKED = 1;

const sendMessage = message => {
if (isMainThread) {
for (const thread of threads) {
thread.worker.postMessage(message);
}
} else {
parentPort.postMessage(message);
}
};

class Lock {
constructor(name, mode = 'exclusive', buffer = null) {
this.name = name;
this.mode = mode; // 'exclusive' or 'shared'
this.queue = [];
this.owner = false;
this.trying = false;
const initial = !buffer;
this.buffer = initial ? new SharedArrayBuffer(4) : buffer;
this.flag = new Int32Array(this.buffer, 0, 1);
if (initial) Atomics.store(this.flag, 0, UNLOCKED);
}

enter(lock) {
this.queue.push(lock);
this.trying = true;
return this.tryEnter();
}

tryEnter() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can merge tryEnter and enterIfAvailable by adding lock argument and a few checks?

if (this.queue.length === 0) return undefined;
const prev = Atomics.exchange(this.flag, 0, LOCKED);
if (prev === LOCKED) return undefined;
this.owner = true;
tshemsedinov marked this conversation as resolved.
Show resolved Hide resolved
this.trying = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this.trying needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can use mutex.queue.length > 0 instead

const lock = this.queue.shift();
return lock.callback(lock).finally(() => {
this.leave();
});
}

leave() {
if (!this.owner) return;
Atomics.store(this.flag, 0, UNLOCKED);
this.owner = false;
sendMessage({ kind: 'leave', name: this.name });
this.tryEnter();
}
}

class LockManagerSnapshot {
constructor(resources) {
const held = [];
const pending = [];
this.held = held;
this.pending = pending;

for (const lock of resources) {
if (lock.queue.length > 0) {
pending.push(...lock.queue);
}
if (lock.owner) {
held.push(lock);
}
}
}
}

class LockManager {
constructor() {
this.collection = new Map();
}

async request(name, options, callback) {
if (typeof options === 'function') {
callback = options;
options = {};
}
const { mode = 'exclusive', signal = null } = options;

let lock = this.collection.get(name);
if (lock) {
if (mode === 'exclusive') {
return new Promise(resolve => {
lock.queue.push([callback, resolve]);
});
}
} else {
lock = new Lock(name, mode);
this.collection.set(name, lock);
const { buffer } = lock;
sendMessage({ kind: 'create', name, mode, buffer });
}

const finished = callback(lock);
let aborted = null;
if (signal) {
aborted = new Promise((resolve, reject) => {
signal.on('abort', reject);
});
await Promise.race([finished, aborted]);
} else {
await finished;
}

let next = lock.queue.pop();
while (next) {
const [handler, resolve] = next;
await handler(lock);
resolve();
next = lock.queue.pop();
}
this.collection.delete(name);
return undefined;
}

query() {
const snapshot = new LockManagerSnapshot();
return Promise.resolve(snapshot);
}
}

class AbortError extends Error {
constructor(message) {
super(message);
this.name = 'AbortError';
}
}

class AbortSignal extends EventEmitter {
constructor() {
super();
this.aborted = false;
this.on('abort', () => {
this.aborted = true;
});
}
}

class AbortController {
constructor() {
this.signal = new AbortSignal();
}

abort() {
const error = new AbortError('The request was aborted');
this.signal.emit('abort', error);
}
}

const locks = new LockManager();

const receiveMessage = message => {
const { kind, name, mode, buffer } = message;
if (kind === 'create') {
const lock = new Lock(name, mode, buffer);
locks.collection.set(name, lock);
} else if (kind === 'leave') {
for (const lock of locks.collection) {
if (lock.name === name && lock.trying) {
lock.tryEnter();
}
}
}
};

if (!isMainThread) {
parentPort.on('message', receiveMessage);
}

class Thread {
constructor(filename, options) {
const worker = new Worker(filename, options);
this.worker = worker;
threads.add(this);
worker.on('message', message => {
for (const thread of threads) {
if (thread.worker !== worker) {
thread.worker.postMessage(message);
}
}
receiveMessage(message);
});
}
}

module.exports = { locks, Thread, AbortController };
4 changes: 4 additions & 0 deletions metasync.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,9 @@ if (nodeVerion >= 10) {
submodules.push(require('./lib/async-iterator'));
}

if (nodeVerion >= 11) {
submodules.push(require('./lib/locks'));
}

const { compose } = submodules[0];
module.exports = Object.assign(compose, ...submodules);
29 changes: 29 additions & 0 deletions test/locks.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
'use strict';

const common = require('@metarhia/common');
const nodeVerion = common.between(process.version, 'v', '.');

if (nodeVerion >= 11) {
const { isMainThread } = require('worker_threads');
const { locks, Thread } = require('..');
const metatests = require('metatests');

const sleep = msec => new Promise(res => setTimeout(res, msec));

if (isMainThread) {
metatests.test('locks: enter and leave', test => {
new Thread(__filename);
new Thread(__filename);

setTimeout(() => {
locks.request('A', async lock => {
test.end();
});
}, 100);
});
} else {
locks.request('A', async lock => {
await sleep(100);
});
}
}