Skip to content

Commit

Permalink
updates to async queue
Browse files Browse the repository at this point in the history
  • Loading branch information
randalf-sr committed Aug 16, 2024
1 parent fdf4d5f commit 825dbb0
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 44 deletions.
4 changes: 2 additions & 2 deletions deno.json
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
{
"name": "@rdtlabs/ts-utils",
"version": "0.2.34",
"version": "0.2.35",
"exports": {
".": "./src/mod.ts",
".": "./src/index.ts",
"./async": "./src/async/index.ts",
"./buffer": "./src/buffer/index.ts",
"./cancellation": "./src/cancellation/index.ts",
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "rdt-utils",
"version": "0.2.34",
"version": "0.2.35",
"description": "Library of typescript utilities",
"main": "./dist/index.js",
"types": "./dist/index.d.ts",
Expand Down
52 changes: 52 additions & 0 deletions src/async/queue/asyncQueue.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,58 @@ import { assert } from "https://deno.land/[email protected]/assert/assert.ts";
import { QueueClosedError, QueueFullError, QueueReadOnlyError } from "./errors.ts";
import { assertRejects } from "https://deno.land/[email protected]/assert/assert_rejects.ts";
import { asyncQueue } from './asyncQueue.ts';
import { waitGroup } from "../WaitGroup.ts";

Deno.test("AsyncQueue on dequeue test", async () => {
let dequeued = 0;
let queued = 0;
let dequeuedOnce = 0;
let queuedOnce = 0;

const wg = waitGroup(8);

const queue = asyncQueue<number>();

queue.on("dequeue", () => {
dequeued++;
wg.done();
});

queue.on("enqueue", () => {
queued++;
wg.done();
});

queue.on("dequeue", () => {
dequeuedOnce++;
wg.done();
}, true);

queue.on("enqueue", () => {
queuedOnce++;
wg.done();
}, true);

queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);

queueMicrotask(() => {
queue.dequeue();
queue.dequeue();
queue.dequeue();
});

await wg.wait();

assert(queued === 3);
assert(dequeued === 3);
assert(queuedOnce === 1);
assert(dequeuedOnce === 1);

queue.close()
await queue.onClose();
});

Deno.test("AsyncQueue enqueue full error", async () => {
const queue = asyncQueue<number>({
Expand Down
143 changes: 115 additions & 28 deletions src/async/queue/asyncQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
import { Deferred } from "../Deferred.ts";
import { __getBufferFromOptions, __getQueueResolvers } from "./_utils.ts";
import type { AsyncQueue, QueueOptions } from "./types.ts";
import type { MaybeResult } from "../../types.ts";
import type { ErrorLike, MaybeResult } from "../../types.ts";
import type { CancellationToken } from "../../cancellation/CancellationToken.ts";

type QueueState = "rw" | "r" | "-rw";
Expand Down Expand Up @@ -77,25 +77,74 @@ export function asyncQueue<T>(
): AsyncQueue<T> {
const { dequeueResolvers, enqueueResolver } = __getQueueResolvers<T>();
const _buffer = __getBufferFromOptions<T>(options);
const _onEnqueue = new Array<{ cb: (item: T) => void; once: boolean }>();
const _onDequeue = new Array<{ cb: (item: T) => void; once: boolean }>();
let _onClose: Deferred<void> | undefined;
let _state: 0 | 1 | 2 = 0;

function enqueueUnsafe(item: T): void {
while (!dequeueResolvers.isEmpty) {
const resolver = dequeueResolvers.dequeue()!;
if (!resolver.getIsCancelled()) {
return resolver.resolve(item);
return resolver.resolve(notifyListeners(item, _onEnqueue));
}
}

try {
_buffer.write(item);
notifyListeners(item, _onEnqueue);
// deno-lint-ignore no-explicit-any
} catch (e: any) {
throw e.name === "BufferFullError" ? new QueueFullError() : e;
}
}

function notifyListeners(
item: T,
listeners: Array<{ cb: (item: T) => void; once: boolean }>,
): T {
// reverse for loop to allow for splicing and act on last listener first semantic
for (let i = listeners.length - 1; i >= 0; i--) {
const listener = listeners[i];
if (listener.once) {
listeners.splice(i, 1);
}
queueMicrotask(() => listener.cb(item));
}

return item;
}

function attachEvent(
promise: Promise<T>,
listeners: Array<{ cb: (item: T) => void; once: boolean }>,
) {
if (listeners.length === 0) {
return promise;
}

return promise.then((item) => notifyListeners(item, listeners));
}

function _on(
listeners: Array<{ cb: (item: T) => void; once: boolean }>,
listener: (item: T) => void,
once?: boolean,
): void {
_off(listeners, listener); // remove if the listeners is already attached
listeners.push({ cb: listener, once: !!once });
}

function _off(
listeners: Array<{ cb: (item: T) => void; once: boolean }>,
listener: (item: T) => void,
): void {
const index = listeners.findIndex((l) => l.cb === listener);
if (index !== -1) {
listeners.splice(index, 1);
}
}

const queue = {
get isClosed(): boolean {
return _state === 2;
Expand All @@ -112,8 +161,27 @@ export function asyncQueue<T>(
get state(): QueueState {
return _state === 0 ? "rw" : _state === 1 ? "r" : "-rw";
},
close(): void {
this[Symbol.dispose]();
close(err?: ErrorLike): void {
if (_state === 2) {
return;
}

_state = 2;
if (err) {
(_onClose ??= new Deferred<void>()).reject(err);
} else {
_onClose?.resolve();
}

_buffer.clear();
_onEnqueue.length = 0;
_onDequeue.length = 0;

for (const resolver of dequeueResolvers.toBufferLike()) {
if (!resolver.getIsCancelled()) {
resolver.reject(new QueueClosedError());
}
}
},
onClose(): Promise<void> {
if (!_onClose) {
Expand All @@ -132,7 +200,9 @@ export function asyncQueue<T>(

_state = 1;
if (queue.isEmpty) {
this[Symbol.dispose]();
this.close();
} else {
_onEnqueue.length = 0;
}
},
tryEnqueue(item: T): boolean {
Expand Down Expand Up @@ -171,20 +241,27 @@ export function asyncQueue<T>(
}

if (!queue.isEmpty) {
return Promise.resolve(_buffer.read()!);
return attachEvent(Promise.resolve(_buffer.read()!), _onDequeue);
}

if (_state === 0) {
if (!cancellationToken || cancellationToken.state === "none") {
return new Promise<T>(enqueueResolver);
return attachEvent(new Promise<T>(enqueueResolver), _onDequeue);
}

return new Promise<T>((resolve, reject) =>
enqueueResolver(resolve, reject, () => cancellationToken.isCancelled)
return attachEvent(
new Promise<T>((resolve, reject) =>
enqueueResolver(
resolve,
reject,
() => cancellationToken.isCancelled,
)
),
_onDequeue,
);
}

this[Symbol.dispose]();
this.close();

return Promise.reject(
new QueueClosedError(
Expand All @@ -198,32 +275,20 @@ export function asyncQueue<T>(
}

if (!queue.isEmpty) {
return { value: _buffer.read()!, ok: true };
return {
value: notifyListeners(_buffer.read()!, _onDequeue),
ok: true,
};
}

if (_state === 1) {
this[Symbol.dispose]();
this.close();
}

return { ok: false };
},
[Symbol.dispose](): void {
if (_state === 2) {
return;
}

_state = 2;
_buffer.clear();

for (const resolver of dequeueResolvers.toBufferLike()) {
if (!resolver.getIsCancelled()) {
resolver.reject(new QueueClosedError());
}
}

if (_onClose) {
_onClose.resolve();
}
queue.close();
},
[Symbol.asyncIterator](): AsyncIterator<T> {
return {
Expand All @@ -239,6 +304,28 @@ export function asyncQueue<T>(
},
} as AsyncIterator<T>;
},
on(
event: "dequeue" | "enqueue",
listener: (item: T) => void,
once?: boolean,
): void {
if (_state === 2) {
throw new QueueClosedError();
}

if (event === "dequeue") {
_on(_onDequeue, listener, once);
} else if (_state !== 1) {
_on(_onEnqueue, listener, once);
}
},
off(event: "dequeue" | "enqueue", listener: (item: T) => void): void {
if (_state === 2) {
throw new QueueClosedError();
}

_off(event === "enqueue" ? _onEnqueue : _onDequeue, listener);
},
};

return queue;
Expand Down
35 changes: 27 additions & 8 deletions src/async/queue/types.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { BufferStrategyOptions } from "../../buffer/BufferLike.ts";
import type { MaybeResult } from "../../types.ts";
import type { ErrorLike, MaybeResult } from "../../types.ts";
import { asyncQueue } from "./asyncQueue.ts";
import type { CancellationToken } from "../../cancellation/CancellationToken.ts";

Expand Down Expand Up @@ -29,19 +29,19 @@ export interface AsyncQueue<T> extends Disposable, AsyncIterable<T> {
* "r": The queue is in a read-only state, items can only be dequeued.
* "-rw": The queue is in a state neither enqueuing/dequeuing is allowed.
*/
readonly state: "rw" | "r" | "-rw";
get state(): "rw" | "r" | "-rw";

/** The current number of items in the queue */
readonly size: number;
get size(): number;

/** Returns whether the queue is empty (true) or not (false) */
readonly isEmpty: boolean;
get isEmpty(): boolean;

/** Returns whether the queue has been closed (true) or is still open (false) */
readonly isClosed: boolean;
get isClosed(): boolean;

/** Returns whether the queue is full (true) or not (false) */
readonly isFull: boolean;
get isFull(): boolean;

/**
* Enqueues an item of type T to the queue.
Expand All @@ -67,7 +67,26 @@ export interface AsyncQueue<T> extends Disposable, AsyncIterable<T> {
dequeue(cancellationToken?: CancellationToken): Promise<T>;

/**
* Synchronously dequeues an item from the queue if the queue us not empty, otherwise it will return an object with the value undefined and ok set to false.
* Event handler for when an item is enqueued/dequeued from the queue.
* @param event The event to listen for.
* @param listener The listener to remove.
* @param once If true, the listener will only be called once.
*/
on(
event: "dequeue" | "enqueue",
listener: (item: T) => void,
once?: boolean,
): void;

/**
* Event handler for when an item is enqueued/dequeued from the queue.
* @param event The event to listen for.
* @param listener The listener to remove.
*/
off(event: "dequeue" | "enqueue", listener: (item: T) => void): void;

/**
* Synchronously dequeues an item from the queue if the queue is not empty, otherwise it will return an object with the value undefined and ok set to false.
*/
tryDequeue(): MaybeResult<T>;

Expand All @@ -78,7 +97,7 @@ export interface AsyncQueue<T> extends Disposable, AsyncIterable<T> {
setReadOnly(): void;

/** Closes the queue. */
close(): void;
close(err?: ErrorLike): void;

/** Returns a promise that resolves when the queue is closed. */
onClose(): Promise<void>;
Expand Down
2 changes: 1 addition & 1 deletion src/cancellation/CancellablePromise.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ export async function cancellablePromise<T>(
} catch (e) {
if (e instanceof CancellationError) {
if (onCancel) {
queueMicrotask(() => onCancel(e));
queueMicrotask(() => onCancel(e as CancellationError));
}

if (defaultValueOnCancel) {
Expand Down
Loading

0 comments on commit 825dbb0

Please sign in to comment.