Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/generic queues #1

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4,033 changes: 4,033 additions & 0 deletions package-lock.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@
"prettier": "^2.2.1",
"prettier-plugin-import-sort": "^0.0.6",
"rollup": "^2.38.5",
"tinyqueue": "^2.0.3",
"ts-node": "^9.1.1",
"typescript": "^4.1.3"
},
Expand Down
18 changes: 9 additions & 9 deletions src/Mutex.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,19 @@
import MutexInterface from './MutexInterface';
import Semaphore from './Semaphore';
import Semaphore, { QueueEntry, QueueLike } from './Semaphore';

class Mutex implements MutexInterface {
constructor(cancelError?: Error) {
this._semaphore = new Semaphore(1, cancelError);
class Mutex<U = void> implements MutexInterface<U> {
constructor(cancelError?: Error, queue?: QueueLike<QueueEntry<U>>) {
this._semaphore = new Semaphore<U>(1, cancelError, queue);
}

async acquire(): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire();
async acquire(data: U): Promise<MutexInterface.Releaser> {
const [, releaser] = await this._semaphore.acquire(data);

return releaser;
}

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T> {
return this._semaphore.runExclusive(() => callback());
runExclusive<T>(callback: MutexInterface.Worker<T>, data: U): Promise<T> {
return this._semaphore.runExclusive(() => callback(), data);
}

isLocked(): boolean {
Expand All @@ -29,7 +29,7 @@ class Mutex implements MutexInterface {
return this._semaphore.cancel();
}

private _semaphore: Semaphore;
private _semaphore: Semaphore<U>;
}

export default Mutex;
6 changes: 3 additions & 3 deletions src/MutexInterface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
interface MutexInterface {
acquire(): Promise<MutexInterface.Releaser>;
interface MutexInterface<U = void> {
acquire(data: U): Promise<MutexInterface.Releaser>;

runExclusive<T>(callback: MutexInterface.Worker<T>): Promise<T>;
runExclusive<T>(callback: MutexInterface.Worker<T>, data: U): Promise<T>;

isLocked(): boolean;

Expand Down
28 changes: 19 additions & 9 deletions src/Semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,44 @@
import { E_CANCELED } from './errors';
import SemaphoreInterface from './SemaphoreInterface';

interface QueueEntry {
export interface QueueEntry<T = void> {
resolve: (ticket: [number, SemaphoreInterface.Releaser]) => void;
reject: (err: Error) => void;
data: T;
}

class Semaphore implements SemaphoreInterface {
constructor(private _maxConcurrency: number, private _cancelError: Error = E_CANCELED) {
export interface QueueLike<T> {
push: (...items: T[]) => number;
shift: () => T | undefined;
forEach: (callbackfn: (value: T, index: number, array: T[]) => void, thisArg?: unknown) => void;
length: number;
}

export type QueueLikeSemaphore = QueueLike<QueueEntry>;
export type QueueLikeArray = QueueEntry[];

class Semaphore<U = void> implements SemaphoreInterface<U> {
constructor(private _maxConcurrency: number, private _cancelError: Error = E_CANCELED, private _queue: QueueLike<QueueEntry<U>> = <QueueEntry<U>[]>[]) {
if (_maxConcurrency <= 0) {
throw new Error('semaphore must be initialized to a positive value');
}

this._value = _maxConcurrency;
}

acquire(): Promise<[number, SemaphoreInterface.Releaser]> {
acquire(data: U): Promise<[number, SemaphoreInterface.Releaser]> {
const locked = this.isLocked();
const ticketPromise = new Promise<[number, SemaphoreInterface.Releaser]>((resolve, reject) =>
this._queue.push({ resolve, reject })
this._queue.push({ resolve, reject, data })
);

if (!locked) this._dispatch();

return ticketPromise;
}

async runExclusive<T>(callback: SemaphoreInterface.Worker<T>): Promise<T> {
const [value, release] = await this.acquire();
async runExclusive<T>(callback: SemaphoreInterface.Worker<T>, data: U): Promise<T> {
const [value, release] = await this.acquire(data);

try {
return await callback(value);
Expand Down Expand Up @@ -58,7 +69,7 @@ class Semaphore implements SemaphoreInterface {

cancel(): void {
this._queue.forEach((ticket) => ticket.reject(this._cancelError));
this._queue = [];
this._queue.length = 0;
}

private _dispatch(): void {
Expand All @@ -79,7 +90,6 @@ class Semaphore implements SemaphoreInterface {
nextTicket.resolve([this._value--, this._currentReleaser]);
}

private _queue: Array<QueueEntry> = [];
private _currentReleaser: SemaphoreInterface.Releaser | undefined;
private _value: number;
}
Expand Down
6 changes: 3 additions & 3 deletions src/SemaphoreInterface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
interface SemaphoreInterface {
acquire(): Promise<[number, SemaphoreInterface.Releaser]>;
interface SemaphoreInterface<U = void> {
acquire(data: U): Promise<[number, SemaphoreInterface.Releaser]>;

runExclusive<T>(callback: SemaphoreInterface.Worker<T>): Promise<T>;
runExclusive<T>(callback: SemaphoreInterface.Worker<T>, data: U): Promise<T>;

isLocked(): boolean;

Expand Down
14 changes: 7 additions & 7 deletions src/withTimeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import { E_TIMEOUT } from './errors';
import MutexInterface from './MutexInterface';
import SemaphoreInterface from './SemaphoreInterface';

export function withTimeout(mutex: MutexInterface, timeout: number, timeoutError?: Error): MutexInterface;
export function withTimeout(semaphore: SemaphoreInterface, timeout: number, timeoutError?: Error): SemaphoreInterface;
export function withTimeout<U = void>(mutex: MutexInterface<U>, timeout: number, timeoutError?: Error): MutexInterface<U>;
export function withTimeout<U = void>(semaphore: SemaphoreInterface<U>, timeout: number, timeoutError?: Error): SemaphoreInterface<U>;
// eslint-disable-next-line @typescript-eslint/explicit-module-boundary-types
export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout: number, timeoutError = E_TIMEOUT) {
export function withTimeout<U = void>(sync: MutexInterface<U> | SemaphoreInterface<U>, timeout: number, timeoutError = E_TIMEOUT) {
return {
acquire: (): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> =>
acquire: (data: U): Promise<MutexInterface.Releaser | [number, SemaphoreInterface.Releaser]> =>
new Promise(async (resolve, reject) => {
let isTimeout = false;

Expand All @@ -17,7 +17,7 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}, timeout);

try {
const ticket = await sync.acquire();
const ticket = await sync.acquire(data);

if (isTimeout) {
const release = Array.isArray(ticket) ? ticket[1] : ticket;
Expand All @@ -36,11 +36,11 @@ export function withTimeout(sync: MutexInterface | SemaphoreInterface, timeout:
}
}),

async runExclusive<T>(callback: (value?: number) => Promise<T> | T): Promise<T> {
async runExclusive<T>(callback: (value?: number) => Promise<T> | T, data: U): Promise<T> {
let release: () => void = () => undefined;

try {
const ticket = await this.acquire();
const ticket = await this.acquire(data);

if (Array.isArray(ticket)) {
release = ticket[1];
Expand Down
2 changes: 1 addition & 1 deletion test/mutex.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as assert from 'assert';
import assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

Expand Down
109 changes: 109 additions & 0 deletions test/semaphore-queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import Semaphore, { QueueEntry, QueueLike } from '../src/Semaphore';
import TinyQueue from "tinyqueue";
import { semaphoreSuite } from './semaphore';
import { mutexSuite } from './mutex';
import Mutex from '../src/Mutex';
import assert from 'assert';
import { InstalledClock, install } from '@sinonjs/fake-timers';
import SemaphoreInterface from '../src/SemaphoreInterface';

export default class HeapHelper<T> implements QueueLike<T> {
constructor(private _queue: TinyQueue<T> = new TinyQueue()) {
}

pop = (): T | undefined => this._queue.pop();

push = (...items: T[]): number => {
for (const item of items) {
this._queue.push(item);
}
return items.length;
}

shift = this.pop;

unshift = this.push;

get length(): number {
return this._queue.length;
}

set length(n: number) {
this._queue.length = n;
this._queue.data.length = n;
}

forEach = (callbackfn: (value: T, index: number, array: T[]) => void): void => {
this._queue.data.forEach((value, index, array) => {
callbackfn(value, index, array);
});
}

toString = (): string => JSON.stringify(this._queue.data);
}

suite('Semaphore with Priority Queue', () => {
const priorityQueue = new HeapHelper<QueueEntry>();
semaphoreSuite((maxConcurrency: number, err?: Error) => new Semaphore(maxConcurrency, err, priorityQueue));

// These tests validate the expected behavior of TinyQueue + Semaphore.
suite('TinyQueue Implementation Tests', () => {
let semaphore: SemaphoreInterface<number>;
let clock: InstalledClock;
const maxPriorityQueue = new TinyQueue<QueueEntry<number>>([], (a, b) => b.data - a.data);
const heap = new HeapHelper<QueueEntry<number>>(maxPriorityQueue);

setup(() => {
clock = install();
semaphore = new Semaphore(2, undefined, heap);
});

teardown(() => clock.uninstall());

test('Semaphore releases higher priority tasks first', async () => {

const [, release1] = await semaphore.acquire(0);
const [,] = await semaphore.acquire(2);
let prio5Finished = false;
let prio1Finished = false;
let prio10Finished = false;

(async () => {
await semaphore.acquire(5);
prio5Finished = true;
})();

(async () => {
await semaphore.acquire(1);
prio1Finished = true;
})();

(async () => {
await semaphore.acquire(10);
prio10Finished = true;
})();

release1();
await clock.tickAsync(1);

assert(prio5Finished === false, 'Priority 5 finished before Priority 10 AND Priority 1.');
assert(prio1Finished === false, 'Priority 1 finished before Priority 10.');
//@ts-expect-error Typescript doesn't know if a promise will run before this.
assert(prio10Finished === true, 'Priority 10 was not completed after semaphore was released.');
});
})
});

suite('Mutex with Priority Queue', () => {
const priorityQueue = new HeapHelper<QueueEntry>();
mutexSuite((err?: Error) => new Mutex(err, priorityQueue));

// TODO: These tests validate the expected behavior of TinyQueue + Mutex.
});

suite('withTimeout with Priority Queue', () => {
// const priorityQueue = new HeapHelper<QueueEntry>();

// TODO: These tests validate the expected behavior of TinyQueue + withTimeout.
});

13 changes: 9 additions & 4 deletions test/semaphore.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import * as assert from 'assert';
import assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

import { E_CANCELED } from '../src/errors';
import Semaphore from '../src/Semaphore';
import Semaphore, { QueueLikeArray, QueueLikeSemaphore } from '../src/Semaphore';
import SemaphoreInterface from '../src/SemaphoreInterface';
import { withTimer } from './util';

export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) => SemaphoreInterface): void => {
export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error, queue?: QueueLikeSemaphore) => SemaphoreInterface): void => {
let semaphore: SemaphoreInterface;
let clock: InstalledClock;

Expand Down Expand Up @@ -262,10 +262,15 @@ export const semaphoreSuite = (factory: (maxConcurrency: number, err?: Error) =>
};

suite('Semaphore', () => {
semaphoreSuite((maxConcurrency: number, err?: Error) => new Semaphore(maxConcurrency, err));
semaphoreSuite((maxConcurrency: number, err?: Error, queue?: QueueLikeSemaphore) => new Semaphore(maxConcurrency, err, queue));

test('Semaphore constructor throws if value <= 0', () => {
assert.throws(() => new Semaphore(0));
assert.throws(() => new Semaphore(-1));
});
});

suite('Semaphore with Explicit Array Queues', () => {
const _queue = <QueueLikeArray>[];
semaphoreSuite((maxConcurrency: number, err?: Error) => new Semaphore(maxConcurrency, err, _queue));
});
2 changes: 1 addition & 1 deletion test/tryAcquire.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as assert from 'assert';
import assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

Expand Down
2 changes: 1 addition & 1 deletion test/withTimeout.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import * as assert from 'assert';
import assert from 'assert';

import { InstalledClock, install } from '@sinonjs/fake-timers';

Expand Down
3 changes: 2 additions & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
"noImplicitAny": true,
"noImplicitReturns": true,
"noUnusedLocals": true,
"allowSyntheticDefaultImports": true
"allowSyntheticDefaultImports": true,
"esModuleInterop": true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changing tsconfig seems more sensitive and more likely to not be accepted upstream, what do we gain from this configuration?

},
"include": ["src/**/*.ts"],
"exclude": ["node_modules"]
Expand Down