Skip to content

Commit

Permalink
Support unlimited buffer for channels (#86)
Browse files Browse the repository at this point in the history
  • Loading branch information
Eyal-Shalev authored May 18, 2022
1 parent e30ce4b commit dc2ce52
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 19 deletions.
9 changes: 5 additions & 4 deletions src/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from "./internal/state_machine.ts";
import {
ignoreAbortedError,
isNonNegativeSafeInteger,
isSafeInteger,
makeAbortCtrl,
raceAbort,
} from "./internal/utils.ts";
Expand Down Expand Up @@ -85,16 +85,17 @@ export class Channel<T>
/**
* Constructs a new Channel with an optional buffer.
*
* @param {number} [bufferSize=0] A safe and positive integer representing the channel buffer size.
* @param {number} [bufferSize=0] A safe integer representing the channel buffer size.
* A `bufferSize` of `0` indicates a channel without any buffer.
* A negative `bufferSize` indicates a channel with an endless buffer.
* @param {ChannelOptions} [options]
*/
constructor(
readonly bufferSize: number = 0,
protected readonly options?: ChannelOptions,
) {
if (!isNonNegativeSafeInteger(bufferSize)) {
throw new RangeError("bufferSize must be a safe non-negative integer.");
if (!isSafeInteger(bufferSize)) {
throw new RangeError("bufferSize must be a safe integer.");
}
this.#state = Idle(this.debug.bind(this));
this.#queue = new Queue<T>(bufferSize);
Expand Down
2 changes: 1 addition & 1 deletion src/channel_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {
} from "deno/testing/asserts.ts";

Deno.test("invalid bufferSize", () => {
assertThrows(() => new Channel(-1), RangeError);
assertThrows(() => new Channel(0.5), RangeError);
});

Deno.test("no-buffer get-> send", async () => {
Expand Down
19 changes: 9 additions & 10 deletions src/internal/queue.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
import { isNonNegativeSafeInteger } from "./utils.ts";
import { isSafeInteger } from "./utils.ts";

/** @internal */
export class Queue<T> {
protected queue: T[] = [];
#queue: T[] = [];

constructor(readonly capacity: number) {
if (!isNonNegativeSafeInteger(capacity)) {
throw new RangeError(
"queue capacity must be a non-negative safe integer",
);
if (!isSafeInteger(capacity)) {
throw new RangeError("queue capacity must be a safe integer");
}
}

enqueue(val: T) {
if (this.isFull) throw new RangeError("queue is full");
this.queue.push(val);
this.#queue.push(val);
}

dequeue(): T {
if (this.isEmpty) throw new RangeError("queue is empty");
return this.queue.shift() as T;
return this.#queue.shift() as T;
}

get isFull(): boolean {
return this.queue.length === this.capacity;
if (this.capacity < 0) return false;
return this.#queue.length === this.capacity;
}

get isEmpty(): boolean {
return this.queue.length === 0;
return this.#queue.length === 0;
}
}
32 changes: 30 additions & 2 deletions src/internal/queue_test.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,35 @@
import { Queue } from "./queue.ts";
import { assertThrows } from "deno/testing/asserts.ts";
import { assert, assertEquals, assertThrows } from "deno/testing/asserts.ts";

Deno.test("invalid capacity", () => {
assertThrows(() => new Queue(-1), RangeError);
assertThrows(() => new Queue(0.1), RangeError);
});

Deno.test("Queue(1)", () => {
const q = new Queue(1);
assert(q.isEmpty && !q.isFull);
q.enqueue(0);
assert(!q.isEmpty && q.isFull);
assertThrows(() => q.enqueue(1), RangeError);
assertEquals(q.dequeue(), 0);
assertThrows(() => q.dequeue(), RangeError);
assert(q.isEmpty && !q.isFull);
});

Deno.test("Queue(0)", () => {
const q = new Queue(0);
assert(q.isEmpty && q.isFull);
assertThrows(() => q.enqueue(0), RangeError);
assertThrows(() => q.dequeue(), RangeError);
});

Deno.test("Queue(-1)", () => {
const q = new Queue(-1);
assert(q.isEmpty && !q.isFull);
q.enqueue(0);
q.enqueue(1);
assert(!q.isEmpty && !q.isFull);
assertEquals(q.dequeue(), 0);
assertEquals(q.dequeue(), 1);
assert(q.isEmpty && !q.isFull);
});
5 changes: 3 additions & 2 deletions src/pipe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ export * from "./subscribe.ts";
*/
export interface ChannelPipeOptions extends ChannelOptions {
/**
* A safe and positive integer representing the channel buffer size.
* A `bufferSize` of `0` indicates a channel without any buffer.
* A safe integer representing the channel buffer size.
* A `bufferSize` of `0` indicates a channel without any buffer.
* A negative `bufferSize` indicates a channel with an endless buffer.
* @type {number}
*/
bufferSize?: number;
Expand Down

0 comments on commit dc2ce52

Please sign in to comment.