Skip to content

Commit

Permalink
Merge pull request #101 from MattiasBuelens/loosen-brand-checks
Browse files Browse the repository at this point in the history
Loosen brand checks on ReadableStream methods
  • Loading branch information
MattiasBuelens authored Nov 17, 2021
2 parents d515169 + 704191d commit e10fb3c
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 66 deletions.
6 changes: 0 additions & 6 deletions src/lib/abstract-ops/ecmascript.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,3 @@
export function CreateArrayFromList<T extends any[]>(elements: T): T {
// We use arrays to represent lists, so this is basically a no-op.
// Do a slice though just in case we happen to depend on the unique-ness.
return elements.slice() as T;
}

export function CopyDataBlockBytes(dest: ArrayBuffer,
destOffset: number,
src: ArrayBuffer,
Expand Down
111 changes: 111 additions & 0 deletions src/lib/helpers/stream-like.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import { typeIsObject } from './miscellaneous';
import assert from '../../stub/assert';
import type { ReadableStreamBYOBReadResult, ReadableStreamDefaultReadResult } from '../readable-stream';

// For methods like ReadableStream.pipeTo() or .tee(), we want the polyfill's implementation to be compatible
// with *any* streams implementation, including native streams or a different polyfill.
// These types define the minimal API that the polyfill *may* expect from such implementations.
// Importantly, these do not have any internal fields and cannot be used by the internal abstract ops,
// so the polyfill can *only* use their public API.

export interface ReadableStreamLike<R = any> {
readonly locked: boolean;

getReader(): ReadableStreamDefaultReaderLike<R>;
}

export interface ReadableByteStreamLike extends ReadableStreamLike<Uint8Array> {
getReader({ mode }: { mode: 'byob' }): ReadableStreamBYOBReaderLike;

getReader(): ReadableStreamDefaultReaderLike<Uint8Array>;
}

export interface ReadableStreamDefaultReaderLike<R = any> {
readonly closed: Promise<void>;

cancel(reason?: any): Promise<void>;

read(): Promise<ReadableStreamDefaultReadResult<R>>;

releaseLock(): void;
}

export interface ReadableStreamBYOBReaderLike {
readonly closed: Promise<void>;

cancel(reason?: any): Promise<void>;

read<T extends ArrayBufferView>(view: T): Promise<ReadableStreamBYOBReadResult<T>>;

releaseLock(): void;
}

export type ReadableStreamReaderLike<R = any> = ReadableStreamDefaultReaderLike<R> | ReadableStreamBYOBReaderLike;

export interface WritableStreamLike<W = any> {
readonly locked: boolean;

getWriter(): WritableStreamDefaultWriterLike<W>;
}

interface WritableStreamDefaultWriterLike<W = any> {
readonly closed: Promise<undefined>;
readonly desiredSize: number | null;
readonly ready: Promise<undefined>;

abort(reason?: any): Promise<void>;

close(): Promise<void>;

releaseLock(): void;

write(chunk: W): Promise<void>;
}

export function IsReadableStreamLike(x: unknown): x is ReadableStreamLike {
if (!typeIsObject(x)) {
return false;
}
if (typeof (x as ReadableStreamLike).getReader !== 'function') {
return false;
}
try {
// noinspection SuspiciousTypeOfGuard
return typeof (x as ReadableStreamLike).locked === 'boolean';
} catch {
// ReadableStream.prototype.locked may throw if its brand check fails
return false;
}
}

export function IsReadableByteStreamLike(x: ReadableStreamLike): x is ReadableByteStreamLike {
assert(IsReadableStreamLike(x));

// This brand check only works for unlocked streams.
// If the stream is locked, getReader() will throw even if "byob" is actually supported.
assert(!x.locked);

try {
(x as ReadableByteStreamLike).getReader({ mode: 'byob' }).releaseLock();
return true;
} catch {
// getReader() throws if mode is not supported
return false;
}
}

export function IsWritableStreamLike(x: unknown): x is WritableStreamLike {
if (!typeIsObject(x)) {
return false;
}
if (typeof (x as WritableStreamLike).getWriter !== 'function') {
return false;
}
try {
// noinspection SuspiciousTypeOfGuard
return typeof (x as WritableStreamLike).locked === 'boolean';
} catch {
// WritableStream.prototype.locked may throw if its brand check fails
return false;
}
}
42 changes: 24 additions & 18 deletions src/lib/readable-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import {
import { ReadableStreamPipeTo } from './readable-stream/pipe';
import { ReadableStreamTee } from './readable-stream/tee';
import type { WritableStream } from './writable-stream';
import { IsWritableStream, IsWritableStreamLocked } from './writable-stream';
import type { ReadableStreamLike, WritableStreamLike } from './helpers/stream-like';
import { IsReadableStreamLike, IsWritableStreamLike } from './helpers/stream-like';
import { SimpleQueue } from './simple-queue';
import {
ReadableByteStreamController,
Expand All @@ -47,7 +48,6 @@ import {
} from './readable-stream/underlying-source';
import { noop } from '../utils';
import { typeIsObject } from './helpers/miscellaneous';
import { CreateArrayFromList } from './abstract-ops/ecmascript';
import { CancelSteps } from './abstract-ops/internal-methods';
import { IsNonNegativeNumber } from './abstract-ops/miscellaneous';
import { assertObject, assertRequiredArgument } from './validators/basic';
Expand Down Expand Up @@ -201,22 +201,23 @@ export class ReadableStream<R = any> {
transform: { readable: RS; writable: WritableStream<R> },
options?: StreamPipeOptions
): RS;
pipeThrough<RS extends ReadableStream>(
rawTransform: { readable: RS; writable: WritableStream<R> } | null | undefined,
pipeThrough<RS extends ReadableStreamLike>(
this: ReadableStreamLike<R>,
rawTransform: { readable: RS; writable: WritableStreamLike<R> } | null | undefined,
rawOptions: StreamPipeOptions | null | undefined = {}
): RS {
if (!IsReadableStream(this)) {
if (!IsReadableStreamLike(this)) {
throw streamBrandCheckException('pipeThrough');
}
assertRequiredArgument(rawTransform, 1, 'pipeThrough');

const transform = convertReadableWritablePair(rawTransform, 'First parameter');
const options = convertPipeOptions(rawOptions, 'Second parameter');

if (IsReadableStreamLocked(this)) {
if (this.locked) {
throw new TypeError('ReadableStream.prototype.pipeThrough cannot be used on a locked ReadableStream');
}
if (IsWritableStreamLocked(transform.writable)) {
if (transform.writable.locked) {
throw new TypeError('ReadableStream.prototype.pipeThrough cannot be used on a locked WritableStream');
}

Expand All @@ -237,16 +238,17 @@ export class ReadableStream<R = any> {
* Piping a stream will lock it for the duration of the pipe, preventing any other consumer from acquiring a reader.
*/
pipeTo(destination: WritableStream<R>, options?: StreamPipeOptions): Promise<void>;
pipeTo(destination: WritableStream<R> | null | undefined,
pipeTo(this: ReadableStreamLike<R>,
destination: WritableStreamLike<R> | null | undefined,
rawOptions: StreamPipeOptions | null | undefined = {}): Promise<void> {
if (!IsReadableStream(this)) {
if (!IsReadableStreamLike(this)) {
return promiseRejectedWith(streamBrandCheckException('pipeTo'));
}

if (destination === undefined) {
return promiseRejectedWith(`Parameter 1 is required in 'pipeTo'.`);
}
if (!IsWritableStream(destination)) {
if (!IsWritableStreamLike(destination)) {
return promiseRejectedWith(
new TypeError(`ReadableStream.prototype.pipeTo's first argument must be a WritableStream`)
);
Expand All @@ -259,12 +261,12 @@ export class ReadableStream<R = any> {
return promiseRejectedWith(e);
}

if (IsReadableStreamLocked(this)) {
if (this.locked) {
return promiseRejectedWith(
new TypeError('ReadableStream.prototype.pipeTo cannot be used on a locked ReadableStream')
);
}
if (IsWritableStreamLocked(destination)) {
if (destination.locked) {
return promiseRejectedWith(
new TypeError('ReadableStream.prototype.pipeTo cannot be used on a locked WritableStream')
);
Expand All @@ -286,13 +288,16 @@ export class ReadableStream<R = any> {
* Note that the chunks seen in each branch will be the same object. If the chunks are not immutable,
* this could allow interference between the two branches.
*/
tee(): [ReadableStream<R>, ReadableStream<R>] {
if (!IsReadableStream(this)) {
tee(): [ReadableStream<R>, ReadableStream<R>];
tee(this: ReadableStreamLike<R>): [ReadableStream<R>, ReadableStream<R>] {
if (!IsReadableStreamLike(this)) {
throw streamBrandCheckException('tee');
}
if (this.locked) {
throw new TypeError('Cannot tee a stream that already has a reader');
}

const branches = ReadableStreamTee(this, false);
return CreateArrayFromList(branches);
return ReadableStreamTee(this, false);
}

/**
Expand All @@ -307,8 +312,9 @@ export class ReadableStream<R = any> {
* `true` for the `preventCancel` option.
*/
values(options?: ReadableStreamIteratorOptions): ReadableStreamAsyncIterator<R>;
values(rawOptions: ReadableStreamIteratorOptions | null | undefined = undefined): ReadableStreamAsyncIterator<R> {
if (!IsReadableStream(this)) {
values(this: ReadableStreamLike<R>,
rawOptions: ReadableStreamIteratorOptions | null | undefined = undefined): ReadableStreamAsyncIterator<R> {
if (!IsReadableStreamLike(this)) {
throw streamBrandCheckException('values');
}

Expand Down
13 changes: 7 additions & 6 deletions src/lib/readable-stream/async-iterator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
/// <reference lib="es2018.asynciterable" />

import type { ReadableStream } from '../readable-stream';
import type { ReadableStreamDefaultReader, ReadableStreamDefaultReadResult } from './default-reader';
import type { ReadableStreamDefaultReaderLike, ReadableStreamLike } from '../helpers/stream-like';
import type { ReadableStreamDefaultReadResult } from './default-reader';
import { IsReadableStreamDefaultReader } from './default-reader';
import { readerLockException } from './generic-reader';
import assert from '../../stub/assert';
import { typeIsObject } from '../helpers/miscellaneous';
Expand All @@ -19,12 +20,12 @@ export interface ReadableStreamAsyncIterator<R> extends AsyncIterator<R> {
}

export class ReadableStreamAsyncIteratorImpl<R> {
private _reader: ReadableStreamDefaultReader<R> | undefined;
private _reader: ReadableStreamDefaultReaderLike<R> | undefined;
private readonly _preventCancel: boolean;
private _ongoingPromise: Promise<ReadableStreamDefaultReadResult<R>> | undefined = undefined;
private _isFinished = false;

constructor(reader: ReadableStreamDefaultReader<R>, preventCancel: boolean) {
constructor(reader: ReadableStreamDefaultReaderLike<R>, preventCancel: boolean) {
this._reader = reader;
this._preventCancel = preventCancel;
}
Expand Down Expand Up @@ -82,7 +83,7 @@ export class ReadableStreamAsyncIteratorImpl<R> {
return promiseRejectedWith(readerLockException('finish iterating'));
}

assert(reader._readRequests.length === 0);
assert(!IsReadableStreamDefaultReader(reader) || reader._readRequests.length === 0);

this._reader = undefined;
if (!this._preventCancel) {
Expand Down Expand Up @@ -135,7 +136,7 @@ if (typeof Symbol.asyncIterator === 'symbol') {

// Abstract operations for the ReadableStream.

export function AcquireReadableStreamAsyncIterator<R>(stream: ReadableStream<R>,
export function AcquireReadableStreamAsyncIterator<R>(stream: ReadableStreamLike<R>,
preventCancel: boolean): ReadableStreamAsyncIterator<R> {
const reader = stream.getReader();
const impl = new ReadableStreamAsyncIteratorImpl(reader, preventCancel);
Expand Down
26 changes: 15 additions & 11 deletions src/lib/readable-stream/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import type { ReadableStream, ReadableStreamState } from '../readable-stream';
import type { ReadableStreamState } from '../readable-stream';
import { IsReadableStream } from '../readable-stream';
import type { WritableStream, WritableStreamState } from '../writable-stream';
import type { WritableStreamState } from '../writable-stream';
import { IsWritableStream } from '../writable-stream';
import type { ReadableStreamLike, WritableStreamLike } from '../helpers/stream-like';
import { IsReadableStreamLike, IsWritableStreamLike } from '../helpers/stream-like';
import assert from '../../stub/assert';
import {
newPromise,
Expand All @@ -17,14 +19,14 @@ import type { AbortSignal } from '../abort-signal';
import { isAbortSignal } from '../abort-signal';
import { DOMException } from '../../stub/dom-exception';

export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
dest: WritableStream<T>,
export function ReadableStreamPipeTo<T>(source: ReadableStreamLike<T>,
dest: WritableStreamLike<T>,
preventClose: boolean,
preventAbort: boolean,
preventCancel: boolean,
signal: AbortSignal | undefined): Promise<undefined> {
assert(IsReadableStream(source));
assert(IsWritableStream(dest));
assert(IsReadableStreamLike(source));
assert(IsWritableStreamLike(dest));
assert(typeof preventClose === 'boolean');
assert(typeof preventAbort === 'boolean');
assert(typeof preventCancel === 'boolean');
Expand All @@ -35,7 +37,9 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
const reader = source.getReader();
const writer = dest.getWriter();

source._disturbed = true;
if (IsReadableStream(source)) {
source._disturbed = true;
}

let shuttingDown = false;
let released = false;
Expand Down Expand Up @@ -134,7 +138,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
uponPromise(reader.closed, () => {
// Closing must be propagated forward
assert(!released);
assert(source._state === 'closed');
assert(!IsReadableStream(source) || source._state === 'closed');
sourceState = 'closed';
if (!preventClose) {
shutdownWithAction(() => {
Expand All @@ -157,7 +161,7 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,
return null;
}
// Errors must be propagated forward
assert(source._state === 'errored');
assert(!IsReadableStream(source) || source._state === 'errored');
sourceState = 'errored';
if (!preventAbort) {
shutdownWithAction(() => writer.abort(storedError), true, storedError);
Expand All @@ -169,15 +173,15 @@ export function ReadableStreamPipeTo<T>(source: ReadableStream<T>,

uponPromise(writer.closed, () => {
assert(!released);
assert(dest._state === 'closed');
assert(!IsWritableStream(dest) || dest._state === 'closed');
destState = 'closed';
return null;
}, storedError => {
if (released) {
return null;
}
// Errors must be propagated backward
assert(dest._state === 'errored');
assert(!IsWritableStream(dest) || dest._state === 'errored');
destState = 'errored';
destStoredError = storedError;
if (!preventCancel) {
Expand Down
Loading

0 comments on commit e10fb3c

Please sign in to comment.