-
-
Notifications
You must be signed in to change notification settings - Fork 32
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
8 changed files
with
118 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
export const ponyfill = {}; | ||
|
||
const {prototype} = ReadableStream; | ||
|
||
// Use this library as a ponyfill instead of a polyfill. | ||
// I.e. avoid modifying global variables. | ||
// We can remove this once https://github.com/Sec-ant/readable-stream/issues/2 is fixed | ||
if (prototype[Symbol.asyncIterator] === undefined && prototype.values === undefined) { | ||
await import('@sec-ant/readable-stream'); | ||
ponyfill.asyncIterator = prototype[Symbol.asyncIterator]; | ||
delete prototype[Symbol.asyncIterator]; | ||
delete prototype.values; | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,23 @@ | ||
import {Duplex, Readable} from 'node:stream'; | ||
import {finished} from 'node:stream/promises'; | ||
|
||
export const createStream = streamDef => typeof streamDef === 'function' | ||
? Duplex.from(streamDef) | ||
: Readable.from(streamDef); | ||
|
||
// @todo Use ReadableStream.from() after dropping support for Node 18 | ||
export const readableStreamFrom = chunks => new ReadableStream({ | ||
start(controller) { | ||
for (const chunk of chunks) { | ||
controller.enqueue(chunk); | ||
} | ||
|
||
controller.close(); | ||
}, | ||
}); | ||
|
||
// Tests related to big buffers/strings can be slow. We run them serially and | ||
// with a higher timeout to ensure they do not randomly fail. | ||
export const BIG_TEST_DURATION = '2m'; | ||
|
||
export const onFinishedStream = stream => finished(stream, {cleanup: true}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
import test from 'ava'; | ||
|
||
// Emulate browsers that do not support those methods | ||
delete ReadableStream.prototype.values; | ||
delete ReadableStream.prototype[Symbol.asyncIterator]; | ||
|
||
// Run those tests, but emulating browsers | ||
await import('./web-stream.js'); | ||
|
||
test('Should not polyfill ReadableStream', t => { | ||
t.is(ReadableStream.prototype.values, undefined); | ||
t.is(ReadableStream.prototype[Symbol.asyncIterator], undefined); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,63 @@ | ||
import test from 'ava'; | ||
import getStream from '../source/index.js'; | ||
import {fixtureString, fixtureMultiString} from './fixtures/index.js'; | ||
import {readableStreamFrom, onFinishedStream} from './helpers/index.js'; | ||
|
||
test('Can use ReadableStream', async t => { | ||
const stream = readableStreamFrom(fixtureMultiString); | ||
t.is(await getStream(stream), fixtureString); | ||
await onFinishedStream(stream); | ||
}); | ||
|
||
test('Can use already ended ReadableStream', async t => { | ||
const stream = readableStreamFrom(fixtureMultiString); | ||
t.is(await getStream(stream), fixtureString); | ||
t.is(await getStream(stream), ''); | ||
await onFinishedStream(stream); | ||
}); | ||
|
||
test('Can use already canceled ReadableStream', async t => { | ||
let canceledValue; | ||
const stream = new ReadableStream({ | ||
cancel(canceledError) { | ||
canceledValue = canceledError; | ||
}, | ||
}); | ||
const error = new Error('test'); | ||
await stream.cancel(error); | ||
t.is(canceledValue, error); | ||
t.is(await getStream(stream), ''); | ||
await onFinishedStream(stream); | ||
}); | ||
|
||
test('Can use already errored ReadableStream', async t => { | ||
const error = new Error('test'); | ||
const stream = new ReadableStream({ | ||
start(controller) { | ||
controller.error(error); | ||
}, | ||
}); | ||
t.is(await t.throwsAsync(getStream(stream)), error); | ||
t.is(await t.throwsAsync(onFinishedStream(stream)), error); | ||
}); | ||
|
||
test('Cancel ReadableStream when maxBuffer is hit', async t => { | ||
let canceled = false; | ||
const stream = new ReadableStream({ | ||
start(controller) { | ||
controller.enqueue(fixtureString); | ||
controller.enqueue(fixtureString); | ||
controller.close(); | ||
}, | ||
cancel() { | ||
canceled = true; | ||
}, | ||
}); | ||
const error = await t.throwsAsync( | ||
getStream(stream, {maxBuffer: 1}), | ||
{message: /maxBuffer exceeded/}, | ||
); | ||
t.deepEqual(error.bufferedData, fixtureString[0]); | ||
await onFinishedStream(stream); | ||
t.true(canceled); | ||
}); |