Skip to content

Commit

Permalink
Merge growing-bytes-backpressure into main
Browse files Browse the repository at this point in the history
  • Loading branch information
sgwilym committed May 29, 2024
2 parents d4ef262 + 68fbd2c commit 2639040
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 57 deletions.
59 changes: 21 additions & 38 deletions src/encoding/growing_bytes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,41 +12,26 @@ Deno.test("GrowingBytes (relative)", async () => {

fifo.push(new Uint8Array([0]));

await delay(0);

assertEquals(bytes.array, new Uint8Array([0]));
assertEquals(bytes.array, new Uint8Array());

fifo.push(new Uint8Array([1]));
fifo.push(new Uint8Array([2, 3]));

await delay(0);

assertEquals(bytes.array, new Uint8Array([0, 1, 2, 3]));

let received = new Uint8Array();
assertEquals(bytes.array, new Uint8Array());

bytes.nextRelative(4).then((bytes) => {
received = bytes;
});
const receivedBytes = await bytes.nextRelative(4);

fifo.push(new Uint8Array([4]));
await delay(0);
assertEquals(receivedBytes, new Uint8Array([0, 1, 2, 3]));

assertEquals(received, new Uint8Array());
const lastPromise = bytes.nextRelative(2);

fifo.push(new Uint8Array([5, 6]));
await delay(0);
assertEquals(bytes.array, new Uint8Array([0, 1, 2, 3]));

assertEquals(received, new Uint8Array());
fifo.push(new Uint8Array([4, 5]));

fifo.push(new Uint8Array([7]));
await delay(0);

assertEquals(received, new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]));

bytes.prune(4);

assertEquals(bytes.array, new Uint8Array([4, 5, 6, 7]));
assertEquals(await lastPromise, new Uint8Array([0, 1, 2, 3, 4, 5]));
});

Deno.test("GrowingBytes (absolute)", async () => {
Expand All @@ -60,37 +45,35 @@ Deno.test("GrowingBytes (absolute)", async () => {

await delay(0);

assertEquals(bytes.array, new Uint8Array([0]));
assertEquals(bytes.array, new Uint8Array());

fifo.push(new Uint8Array([1]));
fifo.push(new Uint8Array([2, 3]));

await delay(0);

assertEquals(bytes.array, new Uint8Array([0, 1, 2, 3]));
assertEquals(bytes.array, new Uint8Array());

let received = new Uint8Array();
const receivedBytes = await bytes.nextAbsolute(4);

bytes.nextAbsolute(8).then((bytes) => {
received = bytes;
});
assertEquals(receivedBytes, new Uint8Array([0, 1, 2, 3]));

bytes.prune(4);

fifo.push(new Uint8Array([4]));
await delay(0);
fifo.push(new Uint8Array([5, 6]));

assertEquals(received, new Uint8Array());
assertEquals(bytes.array, new Uint8Array());

const lastPromise = bytes.nextAbsolute(4);

fifo.push(new Uint8Array([5, 6]));
await delay(0);

assertEquals(received, new Uint8Array());
assertEquals(bytes.array, new Uint8Array([4, 5, 6]));

fifo.push(new Uint8Array([7]));
await delay(0);

assertEquals(received, new Uint8Array([0, 1, 2, 3, 4, 5, 6, 7]));

bytes.prune(4);
await delay(0);

assertEquals(bytes.array, new Uint8Array([4, 5, 6, 7]));
assertEquals(await lastPromise, new Uint8Array([4, 5, 6, 7]));
});
45 changes: 26 additions & 19 deletions src/encoding/growing_bytes.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,21 @@
import { concat } from "@std/bytes";

/** An array of growing bytes which can be awaited upon and pruned. */
/** A bytestring which, upon request, pulls bytes from an asynchronous source of bytes. */
export class GrowingBytes {
/** All received bytes. */
array: Uint8Array = new Uint8Array();

private hasUnfulfilledRequests = Promise.withResolvers<void>();

private deferredUntilLength:
| [number, PromiseWithResolvers<Uint8Array>]
| null = null;

/** Create a new {@linkcode GrowingBytes} from a stream of bytes. */
constructor(incoming: AsyncIterable<Uint8Array>) {
constructor(readonly incoming: AsyncIterable<Uint8Array>) {
(async () => {
await this.hasUnfulfilledRequests.promise;

for await (const chunk of incoming) {
this.array = concat([this.array, chunk]);

Expand All @@ -21,43 +25,46 @@ export class GrowingBytes {
) {
this.deferredUntilLength[1].resolve(this.array);
this.deferredUntilLength = null;
this.hasUnfulfilledRequests = Promise.withResolvers<void>();
}

await this.hasUnfulfilledRequests.promise;
}
})();
}

/** Wait for the underyling Uint8Array to grow relative to the given length, regardless of the current length. */
/** Attempt to pull bytes from the underlying source until the accumulated bytestring has grown by the given amount of bytes.
*
* @param length - The number of bytes to pull from the underlying source until returning.
* @returns The accumulated bytestring after having pulled the given number of bytes.
*/
nextRelative(length: number): Promise<Uint8Array> {
return this.next(length, true);
const target = this.array.byteLength + length;
return this.nextAbsolute(target);
}

/** Wait for the underyling Uint8Array to grow to at least the absolute given length. */
/** Attempt to pull bytes from the underlying source until the accumulated bytestring has grown to the given size.
*
* @param length - The size the accumulated bytestring must have reached before returning.
* @returns The accumulated bytestring after having reached the given bytelength.
*/
nextAbsolute(length: number): Promise<Uint8Array> {
if (this.array.byteLength >= length) {
return Promise.resolve(this.array);
}

return this.next(length, false);
}

private next(ofLength: number, relative: boolean): Promise<Uint8Array> {
const target = relative ? this.array.byteLength + ofLength : ofLength;
this.hasUnfulfilledRequests.resolve();

if (
this.deferredUntilLength &&
this.deferredUntilLength[0] === target
) {
if (this.deferredUntilLength && this.deferredUntilLength[0] === length) {
return this.deferredUntilLength[1].promise;
}

const promiseWithResolvers = Promise.withResolvers<Uint8Array>();

this.deferredUntilLength = [
target,
promiseWithResolvers,
length,
Promise.withResolvers<Uint8Array>(),
];

return promiseWithResolvers.promise;
return this.deferredUntilLength[1].promise;
}

/** Prunes the array by the given bytelength. */
Expand Down

0 comments on commit 2639040

Please sign in to comment.