From 53be8edc148d14d424893eb2875a41046e09f6b4 Mon Sep 17 00:00:00 2001 From: Nikolay Botev Date: Fri, 6 Sep 2024 13:58:47 +0300 Subject: [PATCH] API 1.0 Changes (#31) * New factory API in object-oriented style * Pipe util, interval source, string stream and tests Extract pipe function into util/pipe Fix string stream More tests Add Interval source * map() conformance; first() and last() API changes Fix map() to conform to AsyncIteratorHelp proposal Make predicate optional in first() and last() * Rename AsyncStream -> AsyncIterableStream * Rename polyfill method asyncStream -> streamAsync * Moved stream sources to source/ * Decouple sources from AsyncIteratorStream * Clean up polyfills * Polyfills are actually factories * Add Scheduler abstraction to interval source * Core-js, range tests, sources naming convention * Polyfill Iterator Helpers for test if necessary * Rename source to sources * Fix editor tab size for vscode * Add Event source * Fix pipe.close() to notify waiting next()s * Readable release on iterator end and back-pressure * Rename readable APIs * Rename readable files * AsyncStream -> AsyncIterableStream in comments and related identifiers * AsyncIterableStream -> AsyncIteratorStream * ReadableSplit error handling * Make readableSplit and fromEvent lazy (like rxjs) * Readable.chunks * Use Genearator types and start/next/stop pattern * AsyncGenerator factory from {next, start, stop} functions * readableLines2 - simpler implementation * Introducing IteratorStream * Add sync IteratorHelper protocol * Move test_cli.ts from src/ to new demos/ folder (not packaged) * Bug fixes, improvements, reorg * Hyphenated folder names et al * Fix comment in unit test --- .vscode/launch.json | 23 + .vscode/settings.json | 3 +- README.md | 2 +- demos/test_cli.mjs | 20 + eslint.config.mjs | 8 + package-lock.json | 70 ++- package.json | 6 +- src/async-iterator-stream.test.ts | 44 ++ src/async-iterator-stream.ts | 557 ++++++++++++++++++++ src/behavior.test.ts | 50 +- src/factories/async-iterator.test.ts | 34 ++ src/factories/async-iterator.ts | 19 + src/factories/index.ts | 4 + src/factories/iterator.test.ts | 166 ++++++ src/factories/iterator.ts | 26 + src/factories/readable.ts | 30 ++ src/factories/string.ts | 34 ++ src/generator.test.ts | 100 ++++ src/generator.test22.ts | 63 --- src/index.test.ts | 11 - src/index.ts | 556 +------------------ src/iterator-stream.ts | 510 ++++++++++++++++++ src/polyfill/array.ts | 11 - src/polyfill/asyncGenerator.test.ts | 14 - src/polyfill/asyncGenerator.ts | 15 - src/polyfill/generator.test.ts | 64 --- src/polyfill/generator.ts | 27 - src/polyfill/index.ts | 6 - src/polyfill/map.ts | 21 - src/polyfill/readable.ts | 20 - src/polyfill/set.ts | 11 - src/polyfill/string.ts | 11 - src/polyfill/typedArray.ts | 59 --- src/random.ts | 11 - src/range.ts | 25 - src/readableAsyncIterator.ts | 85 --- src/repeated.ts | 10 - src/secureRandom.ts | 20 - src/sources/async-generator.ts | 27 + src/sources/event.ts | 78 +++ src/sources/interval.ts | 39 ++ src/sources/random.ts | 9 + src/sources/range.test.ts | 97 ++++ src/sources/range.ts | 39 ++ src/sources/readable-chunks.ts | 50 ++ src/{lines.ts => sources/readable-lines.ts} | 14 +- src/sources/readable-lines2.ts | 17 + src/sources/readable-split.ts | 26 + src/sources/repeated.ts | 11 + src/sources/secure-random.test.ts | 10 + src/sources/secure-random.ts | 17 + src/test_cli.ts | 27 - src/util/pipe.ts | 60 +++ tsconfig.json | 2 +- 54 files changed, 2184 insertions(+), 1085 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 demos/test_cli.mjs create mode 100644 src/async-iterator-stream.test.ts create mode 100644 src/async-iterator-stream.ts create mode 100644 src/factories/async-iterator.test.ts create mode 100644 src/factories/async-iterator.ts create mode 100644 src/factories/index.ts create mode 100644 src/factories/iterator.test.ts create mode 100644 src/factories/iterator.ts create mode 100644 src/factories/readable.ts create mode 100644 src/factories/string.ts create mode 100644 src/generator.test.ts delete mode 100644 src/generator.test22.ts delete mode 100644 src/index.test.ts create mode 100644 src/iterator-stream.ts delete mode 100644 src/polyfill/array.ts delete mode 100644 src/polyfill/asyncGenerator.test.ts delete mode 100644 src/polyfill/asyncGenerator.ts delete mode 100644 src/polyfill/generator.test.ts delete mode 100644 src/polyfill/generator.ts delete mode 100644 src/polyfill/index.ts delete mode 100644 src/polyfill/map.ts delete mode 100644 src/polyfill/readable.ts delete mode 100644 src/polyfill/set.ts delete mode 100644 src/polyfill/string.ts delete mode 100644 src/polyfill/typedArray.ts delete mode 100644 src/random.ts delete mode 100644 src/range.ts delete mode 100644 src/readableAsyncIterator.ts delete mode 100644 src/repeated.ts delete mode 100644 src/secureRandom.ts create mode 100644 src/sources/async-generator.ts create mode 100644 src/sources/event.ts create mode 100644 src/sources/interval.ts create mode 100644 src/sources/random.ts create mode 100644 src/sources/range.test.ts create mode 100644 src/sources/range.ts create mode 100644 src/sources/readable-chunks.ts rename src/{lines.ts => sources/readable-lines.ts} (60%) create mode 100644 src/sources/readable-lines2.ts create mode 100644 src/sources/readable-split.ts create mode 100644 src/sources/repeated.ts create mode 100644 src/sources/secure-random.test.ts create mode 100644 src/sources/secure-random.ts delete mode 100644 src/test_cli.ts create mode 100644 src/util/pipe.ts diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..c455c5f --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,23 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "type": "node", + "request": "launch", + "name": "test_cli", + "skipFiles": ["/**"], + "program": "${workspaceFolder}/test_cli.js", + "preLaunchTask": "tsc: build - tsconfig.json", + "outFiles": ["${workspaceFolder}/./**/*.js"] + }, + { + "type": "node", + "request": "attach", + "name": "Attach by Process ID", + "processId": "${command:PickProcess}" + } + ] +} diff --git a/.vscode/settings.json b/.vscode/settings.json index 47e87cf..7442082 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -3,5 +3,6 @@ "editor.formatOnSave": true, "editor.formatOnType": true, "editor.defaultFormatter": "esbenp.prettier-vscode", - "editor.rulers": [80] + "editor.rulers": [80], + "editor.tabSize": 2 } diff --git a/README.md b/README.md index 16503fe..b07cbb0 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,7 @@ A lazy streams library for functional composition in JavaScript. ```javascript -import "streams/polyfill"; +import "streams/factories"; process.stdin .streamLines() diff --git a/demos/test_cli.mjs b/demos/test_cli.mjs new file mode 100644 index 0000000..e7c4c9d --- /dev/null +++ b/demos/test_cli.mjs @@ -0,0 +1,20 @@ +import "../factories/index.js"; + +console.log("Start", process.stdin.readableFlowing); + +process.stdin + .lines() + .stream() + .take(2) + .map((s) => s.substring(0, 2).toLocaleLowerCase()) + .forEach((s) => console.log(s, process.stdin.readableFlowing)) + .then(() => { + console.log("- stdin is", process.stdin.readableFlowing); + process.stdin + .lines() + .stream() + .take(1) + .map((s) => s.toUpperCase()) + .forEach((s) => console.log(s, process.stdin.readableFlowing)) + .then(() => console.log("End", process.stdin.readableFlowing)); + }); diff --git a/eslint.config.mjs b/eslint.config.mjs index a91d34e..5bf8846 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -1,12 +1,20 @@ // @ts-check import eslint from "@eslint/js"; +import globals from "globals"; import tseslint from "typescript-eslint"; export default tseslint.config( eslint.configs.recommended, ...tseslint.configs.recommended, { + linterOptions: {}, + languageOptions: { + ecmaVersion: 2023, + globals: { + ...globals.node, + }, + }, rules: { "@typescript-eslint/no-unused-vars": [ "error", diff --git a/package-lock.json b/package-lock.json index ea2c52d..8ae8981 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,11 +10,14 @@ "license": "MIT", "devDependencies": { "@eslint/js": "^9.9.1", + "@types/core-js": "^2.5.8", "@types/eslint__js": "^8.42.3", "@types/jest": "^29.5.12", "@types/node": "^22.5.3", + "core-js": "^3.38.1", "del-cli": "^5.1.0", "eslint": "^9.9.1", + "globals": "^15.9.0", "jest": "^29.7.0", "npm-run-all": "^4.1.5", "prettier": "^3.3.3", @@ -536,6 +539,16 @@ "node": ">=6.9.0" } }, + "node_modules/@babel/traverse/node_modules/globals": { + "version": "11.12.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", + "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", + "dev": true, + "license": "MIT", + "engines": { + "node": ">=4" + } + }, "node_modules/@babel/types": { "version": "7.23.0", "resolved": "https://registry.npmjs.org/@babel/types/-/types-7.23.0.tgz", @@ -1595,6 +1608,13 @@ "@babel/types": "^7.20.7" } }, + "node_modules/@types/core-js": { + "version": "2.5.8", + "resolved": "https://registry.npmjs.org/@types/core-js/-/core-js-2.5.8.tgz", + "integrity": "sha512-VgnAj6tIAhJhZdJ8/IpxdatM8G4OD3VWGlp6xIxUGENZlpbob9Ty4VVdC1FIEp0aK6DBscDDjyzy5FB60TuNqg==", + "dev": true, + "license": "MIT" + }, "node_modules/@types/eslint": { "version": "9.6.1", "resolved": "https://registry.npmjs.org/@types/eslint/-/eslint-9.6.1.tgz", @@ -2503,6 +2523,18 @@ "integrity": "sha512-Kvp459HrV2FEJ1CAsi1Ku+MY3kasH19TFykTz2xWmMeq6bk2NU3XXvfJ+Q61m0xktWwt+1HSYf3JZsTms3aRJg==", "dev": true }, + "node_modules/core-js": { + "version": "3.38.1", + "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.38.1.tgz", + "integrity": "sha512-OP35aUorbU3Zvlx7pjsFdu1rGNnD4pgw/CWoYzRY3t2EzoVT7shKHY1dlAy3f41cGIO7ZDPQimhGFTlEYkG/Hw==", + "dev": true, + "hasInstallScript": true, + "license": "MIT", + "funding": { + "type": "opencollective", + "url": "https://opencollective.com/core-js" + } + }, "node_modules/create-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/create-jest/-/create-jest-29.7.0.tgz", @@ -3582,12 +3614,16 @@ } }, "node_modules/globals": { - "version": "11.12.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", - "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", + "version": "15.9.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-15.9.0.tgz", + "integrity": "sha512-SmSKyLLKFbSr6rptvP8izbyxJL4ILwqO9Jg23UA0sDlGlu58V59D1//I3vlc0KJphVdUR7vMjHIplYnzBxorQA==", "dev": true, + "license": "MIT", "engines": { - "node": ">=4" + "node": ">=18" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" } }, "node_modules/globby": { @@ -7949,6 +7985,14 @@ "@babel/types": "^7.23.0", "debug": "^4.1.0", "globals": "^11.1.0" + }, + "dependencies": { + "globals": { + "version": "11.12.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", + "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", + "dev": true + } } }, "@babel/types": { @@ -8756,6 +8800,12 @@ "@babel/types": "^7.20.7" } }, + "@types/core-js": { + "version": "2.5.8", + "resolved": "https://registry.npmjs.org/@types/core-js/-/core-js-2.5.8.tgz", + "integrity": "sha512-VgnAj6tIAhJhZdJ8/IpxdatM8G4OD3VWGlp6xIxUGENZlpbob9Ty4VVdC1FIEp0aK6DBscDDjyzy5FB60TuNqg==", + "dev": true + }, "@types/eslint": { "version": "9.6.1", "resolved": "https://registry.npmjs.org/@types/eslint/-/eslint-9.6.1.tgz", @@ -9408,6 +9458,12 @@ "integrity": "sha512-Kvp459HrV2FEJ1CAsi1Ku+MY3kasH19TFykTz2xWmMeq6bk2NU3XXvfJ+Q61m0xktWwt+1HSYf3JZsTms3aRJg==", "dev": true }, + "core-js": { + "version": "3.38.1", + "resolved": "https://registry.npmjs.org/core-js/-/core-js-3.38.1.tgz", + "integrity": "sha512-OP35aUorbU3Zvlx7pjsFdu1rGNnD4pgw/CWoYzRY3t2EzoVT7shKHY1dlAy3f41cGIO7ZDPQimhGFTlEYkG/Hw==", + "dev": true + }, "create-jest": { "version": "29.7.0", "resolved": "https://registry.npmjs.org/create-jest/-/create-jest-29.7.0.tgz", @@ -10180,9 +10236,9 @@ } }, "globals": { - "version": "11.12.0", - "resolved": "https://registry.npmjs.org/globals/-/globals-11.12.0.tgz", - "integrity": "sha512-WOBp/EEGUiIsJSp7wcv/y6MO+lV9UoncWqxuFfm8eBwzWNgyfBd6Gz+IeKQ9jCmyhoH99g15M3T+QaVHFjizVA==", + "version": "15.9.0", + "resolved": "https://registry.npmjs.org/globals/-/globals-15.9.0.tgz", + "integrity": "sha512-SmSKyLLKFbSr6rptvP8izbyxJL4ILwqO9Jg23UA0sDlGlu58V59D1//I3vlc0KJphVdUR7vMjHIplYnzBxorQA==", "dev": true }, "globby": { diff --git a/package.json b/package.json index 217122d..3adc905 100644 --- a/package.json +++ b/package.json @@ -5,7 +5,8 @@ "main": "index.js", "files": [ "**/*.js", - "**/*.d.ts" + "**/*.d.ts", + "!demos/**" ], "scripts": { "build": "run-s format:check eslint clean tsc", @@ -41,11 +42,14 @@ }, "devDependencies": { "@eslint/js": "^9.9.1", + "@types/core-js": "^2.5.8", "@types/eslint__js": "^8.42.3", "@types/jest": "^29.5.12", "@types/node": "^22.5.3", + "core-js": "^3.38.1", "del-cli": "^5.1.0", "eslint": "^9.9.1", + "globals": "^15.9.0", "jest": "^29.7.0", "npm-run-all": "^4.1.5", "prettier": "^3.3.3", diff --git a/src/async-iterator-stream.test.ts b/src/async-iterator-stream.test.ts new file mode 100644 index 0000000..06722bd --- /dev/null +++ b/src/async-iterator-stream.test.ts @@ -0,0 +1,44 @@ +import "./factories"; +import { iteratorInterval } from "./sources/interval"; +import { logger } from "./behavior.test"; + +test("some() consumes iterator", async () => { + // See https://github.com/tc39/proposal-iterator-helpers?tab=readme-ov-file#somefn + async function* naturals() { + let i = 0; + while (true) { + yield i; + i += 1; + } + } + + const iter = naturals().stream().take(4); + + const result1 = await iter.some((v) => v > 1); // true + const result2 = await iter.some((_) => true); // false, iterator is already consumed. + + expect(result1).toBe(true); + expect(result2).toBe(false); +}); + +test("map() concurrent helpers - results are computed concurrently", async () => { + // See https://github.com/tc39/proposal-async-iterator-helpers?tab=readme-ov-file#concurrency + const log = logger(); + const gen = [50, 10] + .values() + .streamAsync() + .map((n) => + iteratorInterval(n) + .stream() + .map((_) => n) + .peek((v) => log(v)) + .first(), + ); + + const result = (await Promise.all([gen.next(), gen.next()])).map( + (x) => x.value, + ); + + expect(result).toEqual([50, 10]); + expect(log.output).toEqual([10, 50]); +}); diff --git a/src/async-iterator-stream.ts b/src/async-iterator-stream.ts new file mode 100644 index 0000000..b644c33 --- /dev/null +++ b/src/async-iterator-stream.ts @@ -0,0 +1,557 @@ +/** + * An object containing factory methods for AsyncIteratorStream. + * + * ``` + * AsyncIteratorStream.from([1, 2, 3]).map(x => x * 2).forEach(console.log); + * ``` + */ +export const AsyncIteratorStream = { + from: asyncIteratorStreamFrom, +}; + +/** + * An asynchronous stream that produces elements of type `T` on demand. + * + * This is an extension of the built-in `AsyncIterable` protocol. + * + * The operations defined here in AsyncIteratorStream are a superset of the + * operations described in the + * [Async Iterator Helpers](https://github.com/tc39/proposal-async-iterator-helpers) + * proposal. + * + * The behavior of all operations here that correspond to operations in the + * `Async Iterator Helpers` proposal are defined (as best as possible) to match + * exactly with the behavior of the corresponding `Async Iterator Helpers` + * operations. This is intended to allow users of this library to seamlessly + * switch between this library and the `Async Iterator Helpers` once those are + * implemented and available. + * + * However, this library is NOT a polyfill for `Async Iterator Helpers`. To use + * this library, an async iterable iterator or generator needs to be explicitly + * wrapped in an AsyncIteratorStream like in this example: + * + * ``` + * import "streams/asyncGenerator"; + * + * async function* generator() { + * yield* [1, 2, 3]; + * } + * + * generator() // returns an AsyncGenerator + * .stream() // convert to AsyncIteratorStream first! + * .forEach(console.log); // use the AsyncIteratorStream APIs + * ``` + */ +export interface AsyncIteratorStream extends AsyncIterableIterator { + // + // Intermediate operations + // + + /** + * Returns a new stream that skips elements of this stream not matched by the + * `predicate`. + * + * See also [IteratorHelpers#filter](https://github.com/tc39/proposal-iterator-helpers#filterfiltererfn). + * + * @param predicate a function that decides whether to include each element + * in the new stream (true) or to exclude the element (false) + */ + filter(predicate: (_: T) => boolean): AsyncIteratorStream; + + /** + * Returns a new stream that transforms each element of this stream + * using the provided function. + * + * See also [IteratorHelpers#map](https://github.com/tc39/proposal-iterator-helpers#mapmapperfn). + * + * @param transform a function to apply to each element of this stream + */ + map(transform: (_: T) => U): AsyncIteratorStream; + + /** + * Like `map` but the `transform` is an async function that returns a Promise + * and is awaited before producing the next transformed element. + * + * @param transform an async function to apply to each element of this stream + */ + mapAwait(transform: (_: T) => Promise): AsyncIteratorStream; + + /** + * + * See also [IteratorHelpers#flatMap](https://github.com/tc39/proposal-iterator-helpers#flatmapmapperfn). + * + * @param transform + */ + flatMap(transform: (_: T) => AsyncIterable): AsyncIteratorStream; + + batch(batchSize: number): AsyncIteratorStream; + + /** + * Returns a new stream that produces up to the first `limit` number of + * elements of this stream. + * + * See also [IteratorHelpers#take](https://github.com/tc39/proposal-iterator-helpers#takelimit). + * + * @param limit the maximum number of items to produce + */ + take(limit: number): AsyncIteratorStream; + + /** + * + * See also [IteratorHelpers#drop](https://github.com/tc39/proposal-iterator-helpers#droplimit). + * + * @param n + */ + drop(n: number): AsyncIteratorStream; + + dropWhile(predicate: (_: T) => boolean): AsyncIteratorStream; + + takeWhile(predicate: (_: T) => boolean): AsyncIteratorStream; + + peek(observer: (_: T) => void): AsyncIteratorStream; + + // + // Terminal operations + // + + /** + * + * See also [IteratorHelpers#forEach](https://github.com/tc39/proposal-iterator-helpers#foreachfn). + * + * @param block + */ + forEach(block: (_: T) => unknown | Promise): Promise; + collect( + container: A, + accumulator: (a: A, t: T) => void, + finisher: (_: A) => R, + ): Promise; + reduceLeft(initial: R, accumulator: (r: R, t: T) => R): Promise; + + /** + * + * See also [IteratorHelpers#reduce](https://github.com/tc39/proposal-iterator-helpers#reducereducer--initialvalue-). + * + * @param accumulator + * @param initial + */ + reduce(accumulator: (a: T, b: T) => T, initial?: T): Promise; + + /** + * Like {@link AsyncIteratorStream#reduce()} but returns `undefined` if this stream is + * empty instead of throwing `TypeError`. + * + * @param accumulator + * @param initial + */ + fold(accumulator: (a: T, b: T) => T, initial?: T): Promise; + + /** + * + * See also [IteratorHelpers#every](https://github.com/tc39/proposal-iterator-helpers#everyfn). + * + * @param predicate + */ + every(predicate: (_: T) => boolean): Promise; + + /** + * See also [IteratorHelpers#some](https://github.com/tc39/proposal-iterator-helpers#somefn). + * + * @param predicate + */ + some(predicate: (_: T) => boolean): Promise; + + none(predicate: (_: T) => boolean): Promise; + + count(): Promise; + + /** + * Returns the first element that matches the predicate. + * + * This is the same as the {@link first} method except that the predicate is + * required. + * + * See also [IteratorHelpers#find](https://github.com/tc39/proposal-iterator-helpers#findfn). + * + * @param predicate + */ + find(predicate: (_: T) => boolean): Promise; + + first(predicate?: (_: T) => boolean): Promise; + last(predicate?: (_: T) => boolean): Promise; + max(comparator: (a: T, b: T) => number): Promise; + min(comparator: (a: T, b: T) => number): Promise; + + /** + * See also [IteratorHelpers#toArray](https://github.com/tc39/proposal-iterator-helpers#toarray). + */ + toArray(): Promise; +} + +// +// AsyncIteratorStream Implementation +// + +class AsyncIteratorStreamOfIterator + implements AsyncIteratorStream, AsyncIterableIterator +{ + constructor(private readonly iterator: AsyncIterator) {} + + stream() { + return this; + } + + // The AsyncIterator protocol + next(...args: [] | [undefined]) { + return this.iterator.next(...args); + } + + readonly return = this.iterator.return + ? (value?: unknown) => { + return this.iterator.return!(value); + } + : undefined; + + readonly throw = this.iterator.throw + ? (e?: unknown) => { + return this.iterator.throw!(e); + } + : undefined; + + [Symbol.asyncIterator]() { + return this; + } + + filter(predicate: (_: T) => boolean): AsyncIteratorStream { + async function* filtered(it: AsyncIteratorStream) { + for await (const v of it) { + if (predicate(v)) { + yield v; + } + } + } + return new AsyncIteratorStreamOfIterator(filtered(this)); + } + + map(transform: (_: T) => U): AsyncIteratorStream { + function mapResult(sourceResult) { + return new Promise>((resolve, reject) => { + Promise.resolve(sourceResult).then((result) => { + if (result.done) { + resolve(result); + } else { + // transform could return a Promise... + const transformed = transform(result.value); + Promise.resolve(transformed).then( + (value) => resolve({ value, done: false }), + reject, + ); + } + }, reject); + }); + } + + const nextMapped = (...args: [] | [undefined]) => { + const sourceResult = this.iterator.next(...args); + return mapResult(sourceResult); + }; + + const mappedIterator = { next: nextMapped } as AsyncIterator; + + if (this.iterator.return) { + mappedIterator.return = (value?) => { + const sourceResult = this.iterator.return!(value); + return mapResult(sourceResult); + }; + } + + if (this.iterator.throw) { + mappedIterator.throw = (e?) => { + const sourceResult = this.iterator.throw!(e); + return mapResult(sourceResult); + }; + } + return new AsyncIteratorStreamOfIterator(mappedIterator); + } + + mapAwait(transform: (_: T) => Promise): AsyncIteratorStream { + async function* mapAwaited(it: AsyncIteratorStream) { + for await (const v of it) { + yield transform(v); + } + } + return new AsyncIteratorStreamOfIterator(mapAwaited(this)); + } + + flatMap(transform: (_: T) => AsyncIterable): AsyncIteratorStream { + async function* flatMapped(it: AsyncIteratorStream) { + for await (const nested of it) { + yield* transform(nested); + } + } + return new AsyncIteratorStreamOfIterator(flatMapped(this)); + } + + batch(batchSize: number): AsyncIteratorStream { + if (batchSize < 1) { + throw new Error("batchSize should be positive"); + } + + async function* batched(it: AsyncIteratorStream) { + let acc: T[] = []; + for await (const v of it) { + acc.push(v); + if (acc.length === batchSize) { + yield acc; + acc = []; + } + } + if (acc.length > 0) { + yield acc; + } + } + return new AsyncIteratorStreamOfIterator(batched(this)); + } + + take(maxSize: number): AsyncIteratorStream { + async function* limited(it: AsyncIteratorStream) { + let count = 0; + if (count >= maxSize) { + return; + } + for await (const v of it) { + yield v; + count += 1; + if (count >= maxSize) { + return; + } + } + } + return new AsyncIteratorStreamOfIterator(limited(this)); + } + + drop(n: number): AsyncIteratorStream { + async function* skipped(it: AsyncIteratorStream) { + let count = 0; + for await (const v of it) { + if (count >= n) { + yield v; + } + count += 1; + } + } + return new AsyncIteratorStreamOfIterator(skipped(this)); + } + + dropWhile(predicate: (_: T) => boolean): AsyncIteratorStream { + async function* droppedWhile(it: AsyncIteratorStream) { + let dropping = true; + for await (const v of it) { + dropping = dropping && predicate(v); + if (!dropping) { + yield v; + } + } + } + return new AsyncIteratorStreamOfIterator(droppedWhile(this)); + } + + takeWhile(predicate: (_: T) => boolean): AsyncIteratorStream { + async function* takenWhile(it: AsyncIteratorStream) { + for await (const v of it) { + if (!predicate(v)) { + return; + } + yield v; + } + } + return new AsyncIteratorStreamOfIterator(takenWhile(this)); + } + + peek(observer: (_: T) => void): AsyncIteratorStream { + async function* peeked(it: AsyncIteratorStream) { + for await (const v of it) { + observer(v); + yield v; + } + } + return new AsyncIteratorStreamOfIterator(peeked(this)); + } + + async forEach(block: (_: T) => unknown | Promise): Promise { + for await (const v of this) { + await block(v); + } + } + + async collect( + container: A, + accumulator: (a: A, t: T) => void, + finisher: (_: A) => R, + ): Promise { + for await (const v of this) { + accumulator(container, v); + } + return finisher(container); + } + + async reduceLeft(initial: R, reducer: (r: R, t: T) => R): Promise { + let result = initial; + for await (const v of this) { + result = reducer(result, v); + } + return result; + } + + async every(predicate: (_: T) => boolean): Promise { + for await (const v of this) { + if (!(await predicate(v))) { + return false; + } + } + return true; + } + + async some(predicate: (_: T) => boolean): Promise { + for await (const v of this) { + if (await predicate(v)) { + return true; + } + } + return false; + } + + async none(predicate: (_: T) => boolean): Promise { + for await (const v of this) { + if (await predicate(v)) { + return false; + } + } + return true; + } + + async count(): Promise { + let count = 0; + for await (const _ of this) { + count += 1; + } + return count; + } + + async find(predicate: (_: T) => boolean): Promise { + for await (const v of this) { + if (await predicate(v)) { + return v; + } + } + return undefined; + } + + async first( + predicate: (_: T) => boolean = (_) => true, + ): Promise { + for await (const v of this) { + if (await predicate(v)) { + return v; + } + } + return undefined; + } + + async last( + predicate: (_: T) => boolean = (_) => true, + ): Promise { + let result: T | undefined; + for await (const v of this) { + if (await predicate(v)) { + result = v; + } + } + return result; + } + + async max(comparator: (a: T, b: T) => number): Promise { + let result: T | undefined; + let firstItem = true; + for await (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = comparator(result!, v) > 0 ? result : v; + } + } + return result; + } + + async min(comparator: (a: T, b: T) => number): Promise { + let result: T | undefined; + let firstItem = true; + for await (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = comparator(result!, v) < 0 ? result : v; + } + } + return result; + } + + async reduce(adder: (a: T, b: T) => T, initial?: T): Promise { + const hasInitial = arguments.length >= 2; + let firstItem = !hasInitial; + let result = initial; + for await (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = adder(result!, v); + } + } + if (firstItem) { + throw new TypeError("reduce without initial value but stream is empty"); + } + return result!; + } + + async fold(adder: (a: T, b: T) => T, initial?: T): Promise { + const hasInitial = arguments.length >= 2; + let firstItem = !hasInitial; + let result = initial; + for await (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = adder(result!, v); + } + } + return result; + } + + async toArray(): Promise { + const result = [] as T[]; + for await (const v of this) { + result.push(v); + } + return result; + } +} + +// +// AsyncIteratorStream Factory +// + +function asyncIteratorStreamFrom( + it: Iterable | Iterator | AsyncIterable | AsyncIterator, +): AsyncIteratorStream { + if (typeof it[Symbol.asyncIterator] === "function") { + return new AsyncIteratorStreamOfIterator(it[Symbol.asyncIterator]()); + } + if (typeof it[Symbol.iterator] === "function") { + return new AsyncIteratorStreamOfIterator(it[Symbol.iterator]()); + } + return new AsyncIteratorStreamOfIterator(it as AsyncIterator); +} diff --git a/src/behavior.test.ts b/src/behavior.test.ts index 5a75e1e..57b6e33 100644 --- a/src/behavior.test.ts +++ b/src/behavior.test.ts @@ -1,8 +1,11 @@ // Tests that demonstrate various non-obvious behaviors -// of async iterators and generator. +// of Async Iterators and Generator functions. // -function logger() { +import "./factories/async-iterator"; +import { iteratorInterval } from "./sources/interval"; + +export function logger() { const output: unknown[] = []; const log = (...s: unknown[]) => { @@ -335,3 +338,46 @@ test("async generator does not pass through next() and return() arguments", asyn expect(await it2.return("YAY")).toEqual({ value: "YAY", done: true }); expect(log.output).toEqual(["NEXT", "RETURN", undefined]); }); + +test("async generator awaits promises before yielding", async () => { + const log = logger(); + + async function* generator() { + log("generate first"); + const first = iteratorInterval(40) + .stream() + .map((_) => "first") + .peek(() => log("first produced")) + .first(); + yield first; + log("generate second"); + const second = iteratorInterval(20) + .stream() + .map((_) => "second") + .peek(() => log("second produced")) + .first(); + yield second; + log("generate done"); + } + + log("start"); + const result = generator(); + const p1 = result.next(); + log("first requested"); + const p2 = result.next(); + log("second requested"); + const resolved = (await Promise.all([p1, p2])).map((r) => r.value); + log("all done"); + + expect(resolved).toEqual(["first", "second"]); + expect(log.output).toEqual([ + "start", + "generate first", + "first requested", + "second requested", + "first produced", + "generate second", + "second produced", + "all done", + ]); +}); diff --git a/src/factories/async-iterator.test.ts b/src/factories/async-iterator.test.ts new file mode 100644 index 0000000..1493e9f --- /dev/null +++ b/src/factories/async-iterator.test.ts @@ -0,0 +1,34 @@ +import "./async-iterator"; + +test("AsyncIterator.stream works with async generator", async () => { + async function* gen() { + yield* [1, 2, 3]; + } + + const r = await gen() + .stream() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("AsyncIterator.stream does NOT work with non-async generator", async () => { + function* gen() { + yield* [1, 2, 3]; + } + + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const f = () => (gen() as any).stream(); + + expect(f).toThrow(TypeError); +}); + +test("AsyncIterator.stream does NOT work with Array.values()", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const gen = [1, 2, 3].values() as any; + + const f = () => gen.stream(); + + expect(f).toThrow(TypeError); +}); diff --git a/src/factories/async-iterator.ts b/src/factories/async-iterator.ts new file mode 100644 index 0000000..e0f2ec5 --- /dev/null +++ b/src/factories/async-iterator.ts @@ -0,0 +1,19 @@ +import { AsyncIteratorStream } from "../async-iterator-stream"; + +declare global { + interface AsyncGenerator { + stream(): AsyncIteratorStream; + } + + interface AsyncIterableIterator { + stream(): AsyncIteratorStream; + } +} + +const AsyncIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(Object.getPrototypeOf((async function* () {})())), +); + +AsyncIteratorPrototype.stream = function () { + return AsyncIteratorStream.from(this); +}; diff --git a/src/factories/index.ts b/src/factories/index.ts new file mode 100644 index 0000000..d2fb416 --- /dev/null +++ b/src/factories/index.ts @@ -0,0 +1,4 @@ +import "./async-iterator"; +import "./iterator"; +import "./readable"; +import "./string"; diff --git a/src/factories/iterator.test.ts b/src/factories/iterator.test.ts new file mode 100644 index 0000000..796985e --- /dev/null +++ b/src/factories/iterator.test.ts @@ -0,0 +1,166 @@ +import "./iterator"; + +test("the Array prototype is not modified", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const arr = [1, 2, 3] as any; + + const f = () => arr.streamAsync(); + + expect(f).toThrow(TypeError); +}); + +test("the String prototype is not modified", async () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const str = "abc" as any; + + const f = () => str.streamAsync(); + + expect(f).toThrow(TypeError); +}); + +test("IterableIterator.streamAsync works with generator functions", async () => { + function* gen() { + yield* [1, 2, 3]; + } + + const r = await gen() + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("IterableIterator.streamAsync works with Array.keys()", async () => { + const gen = [1, 2, 3].keys(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([0, 2, 4]); +}); + +test("IterableIterator.streamAsync works with Array.values()", async () => { + const gen = [1, 2, 3].values(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("IterableIterator.streamAsync works with TypedArray.keys()", async () => { + const gen = Float64Array.of(1, 2, 3).keys(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([0, 2, 4]); +}); + +test("IterableIterator.streamAsync works with TypedArray.values()", async () => { + const gen = Float64Array.of(1, 2, 3).values(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("IterableIterator.streamAsync works with Map.keys()", async () => { + const map = new Map(); + map.set(1, "a"); + map.set(2, "b"); + map.set(3, "c"); + const gen = map.keys(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("IterableIterator.streamAsync works with Map.values()", async () => { + const map = new Map(); + map.set(1, "a"); + map.set(2, "b"); + map.set(3, "c"); + const gen = map.values(); + + const r = await gen + .streamAsync() + .map((x) => x.toUpperCase()) + .toArray(); + + expect(r).toEqual(["A", "B", "C"]); +}); + +test("IterableIterator.streamAsync works with Map.entries()", async () => { + const map = new Map([ + [1, "a"], + [2, "b"], + [3, "c"], + ]); + const gen = map.entries(); + + const r = await gen + .streamAsync() + .map(([k, v]) => [k + 1, v.toUpperCase()]) + .toArray(); + + expect(r).toEqual([ + [2, "A"], + [3, "B"], + [4, "C"], + ]); +}); + +test("IterableIterator.streamAsync works with Set.keys()", async () => { + const set = new Set([1, 2, 3]); + const gen = set.keys(); + + const r = await gen + .streamAsync() + .map((x) => x * 2) + .toArray(); + + expect(r).toEqual([2, 4, 6]); +}); + +test("IterableIterator.streamAsync works with Set.values()", async () => { + const set = new Set([1, 2, 3]); + const gen = set.values(); + + const r = await gen + .streamAsync() + .map((x) => x * 3) + .toArray(); + + expect(r).toEqual([3, 6, 9]); +}); + +test("IterableIterator.streamAsync works with Set.entries()", async () => { + const set = new Set([1, 2, 3]); + const gen = set.entries(); + + const r = await gen + .streamAsync() + .map(([k, v]) => [k * 2, v * 3]) + .toArray(); + + expect(r).toEqual([ + [2, 3], + [4, 6], + [6, 9], + ]); +}); diff --git a/src/factories/iterator.ts b/src/factories/iterator.ts new file mode 100644 index 0000000..ff97e37 --- /dev/null +++ b/src/factories/iterator.ts @@ -0,0 +1,26 @@ +import { AsyncIteratorStream } from "../async-iterator-stream"; +import { IteratorStream } from "../iterator-stream"; + +declare global { + interface Generator { + stream(): IteratorStream; + streamAsync(): AsyncIteratorStream; + } + + interface IterableIterator { + stream(): IteratorStream; + streamAsync(): AsyncIteratorStream; + } +} + +const IterableIteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf([].values()), +); + +IterableIteratorPrototype.stream = function () { + return IteratorStream.from(this); +}; + +IterableIteratorPrototype.streamAsync = function () { + return AsyncIteratorStream.from(this); +}; diff --git a/src/factories/readable.ts b/src/factories/readable.ts new file mode 100644 index 0000000..9f58a9a --- /dev/null +++ b/src/factories/readable.ts @@ -0,0 +1,30 @@ +import { Readable, Duplex } from "node:stream"; +import { readableLines } from "../sources/readable-lines2"; +import { readableChunks } from "../sources/readable-chunks"; + +declare module "node:stream" { + interface Readable { + chunks(encoding?: BufferEncoding): AsyncIterableIterator; + lines(encoding?: BufferEncoding): AsyncIterableIterator; + } + interface Duplex { + chunks(encoding?: BufferEncoding): AsyncIterableIterator; + lines(encoding?: BufferEncoding): AsyncIterableIterator; + } +} + +Readable.prototype.chunks = function (encoding?: BufferEncoding) { + return readableChunks(this, encoding); +}; + +Readable.prototype.lines = function (encoding?: BufferEncoding) { + return readableLines(this, encoding); +}; + +Duplex.prototype.chunks = function (encoding?: BufferEncoding) { + return readableChunks(this, encoding); +}; + +Duplex.prototype.lines = function (encoding?: BufferEncoding) { + return readableLines(this, encoding); +}; diff --git a/src/factories/string.ts b/src/factories/string.ts new file mode 100644 index 0000000..df9476f --- /dev/null +++ b/src/factories/string.ts @@ -0,0 +1,34 @@ +export {}; + +declare global { + interface String { + chars(): Iterable; + charCodes(): Iterable; + codePoints(): Iterable; + } +} + +String.prototype.chars = function () { + function* stringChars(str: string) { + for (let i = 0; i < str.length; i++) { + yield str.charAt(i); + } + } + return stringChars(this as string); +}; + +String.prototype.charCodes = function () { + function* stringCharCodes(str: string) { + for (let i = 0; i < str.length; i++) { + yield str.charCodeAt(i); + } + } + return stringCharCodes(this as string); +}; + +String.prototype.codePoints = function () { + function* stringCodePoints(str: string) { + yield* str; + } + return stringCodePoints(this as string); +}; diff --git a/src/generator.test.ts b/src/generator.test.ts new file mode 100644 index 0000000..1920e39 --- /dev/null +++ b/src/generator.test.ts @@ -0,0 +1,100 @@ +// Tests that determine various behaviors of Iterator Helpers +// + +import "./factories/async-iterator"; +import { iteratorInterval } from "./sources/interval"; + +describe("tests that require Iterator Helpers", () => { + // Load Iterator Helpers from core-js if not available. + const IteratorPrototype = Object.getPrototypeOf( + Object.getPrototypeOf(Object.getPrototypeOf((function* () {})())), + ); + if (typeof IteratorPrototype.map !== "function") { + import("core-js/full/iterator"); + } + + test("map works as expected", () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a: any = [1, 2, 3].values() as any; + + const s = Array.from(a.map((it) => it * it)); + + expect(s).toEqual([1, 4, 9]); + }); + + test("filter and map work as expected", () => { + function* gen() { + yield 1; + yield 2; + yield 3; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = gen() as any; + + const s = Array.from(a.filter((x) => x > 1).map((it) => it * it)); + + expect(s).toEqual([4, 9]); + }); + + test("reduce of empty iterator throws TypeError", () => { + function* gen() { + // Empty iterator + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = gen() as any; + + const s = () => a.reduce((a, b) => a + b); + + expect(s).toThrow(TypeError); + }); + + test("reduce of empty iterator with undefined initial element returns undefined", () => { + function* gen() { + // Empty iterator + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = gen() as any; + + const s = a.reduce((a, b) => a + b, undefined); + + expect(s).toBeUndefined(); + }); + + test("reduce of single-element iterator returns single element", () => { + function* gen() { + yield 42; + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = gen() as any; + + const s = a.reduce((a, b) => a + b); + + expect(s).toBe(42); + }); + + test("promises are not awaited by generators", () => { + function* gen() { + yield iteratorInterval(10).stream().first(); + yield iteratorInterval(10).stream().first(); + } + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = gen() as any; + + const result = a + .map((x) => ({ + result: x, + })) + .toArray(); + + expect(result).toHaveLength(2); + expect(result[0].result).toBeInstanceOf(Promise); + expect(result[1].result).toBeInstanceOf(Promise); + }); + + test("find requires a predicate", () => { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const a = [].values() as any; + + expect(() => a.find()).toThrow(TypeError); + }); +}); diff --git a/src/generator.test22.ts b/src/generator.test22.ts deleted file mode 100644 index cba0078..0000000 --- a/src/generator.test22.ts +++ /dev/null @@ -1,63 +0,0 @@ -// These tests are disabled by default. Run these tests by: -// -// npm test -- --testRegex test22 -// - -test("map works as expected", () => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const a: any = [1, 2, 3].values() as any; - - const s = Array.from(a.map((it) => it * it)); - - expect(s).toEqual([1, 4, 9]); -}); - -test("filter and map work as expected", () => { - function* gen() { - yield 1; - yield 2; - yield 3; - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const a = gen() as any; - - const s = Array.from(a.filter((x) => x > 1).map((it) => it * it)); - - expect(s).toEqual([4, 9]); -}); - -test("reduce of empty iterator throws TypeError", () => { - function* gen() { - // Empty iterator - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const a = gen() as any; - - const s = () => a.reduce((a, b) => a + b); - - expect(s).toThrow(TypeError); -}); - -test("reduce of empty iterator with undefined initial element returns undefined", () => { - function* gen() { - // Empty iterator - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const a = gen() as any; - - const s = a.reduce((a, b) => a + b, undefined); - - expect(s).toBeUndefined(); -}); - -test("reduce of single-element iterator returns single element", () => { - function* gen() { - yield 42; - } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const a = gen() as any; - - const s = a.reduce((a, b) => a + b); - - expect(s).toBe(42); -}); diff --git a/src/index.test.ts b/src/index.test.ts deleted file mode 100644 index be1bbe3..0000000 --- a/src/index.test.ts +++ /dev/null @@ -1,11 +0,0 @@ -test("some() consumes iterator", async () => { - // See https://github.com/tc39/proposal-iterator-helpers?tab=readme-ov-file#somefn -}); - -test("map() concurrent helpers - results are computed concurrently", async () => { - // See https://github.com/tc39/proposal-async-iterator-helpers?tab=readme-ov-file#concurrency -}); - -test("map() passing the protocol", async () => { - // See https://github.com/tc39/proposal-async-iterator-helpers/blob/main/DETAILS.md#passing-the-protocol -}); diff --git a/src/index.ts b/src/index.ts index a941914..0ad7034 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,554 +1,2 @@ -/** - * An asynchronous stream that produces elements of type `T` on demand. - * - * This is an extension of the built-in `AsyncIterable` protocol. - * - * The operations defined here in AsyncStream are a superset of the operations - * described in the [Async Iterator Helpers](https://github.com/tc39/proposal-async-iterator-helpers) - * proposal. - * - * The behavior of all operations here that correspond to operations in the - * `Async Iterator Helpers` proposal are defined (as best as possible) to match - * exactly with the behavior of the corresponding `Async Iterator Helpers` - * operations. This is intended to allow users of this library to seamlessly - * switch between this library and the `Async Iterator Helpers` once those are - * implemented and available. - * - * However, this library is NOT a polyfill for `Async Iterator Helpers`. To use - * this library, an async iterable iterator or generator needs to be explicitly - * wrapped in an AsyncStream like in this example: - * - * ``` - * import "streams/asyncGenerator"; - * - * async function* generator() { - * yield* [1, 2, 3]; - * } - * - * generator() // returns an AsyncGenerator - * .stream() // convert to AsyncStream first! - * .forEach(console.log); // use the AsyncStream APIs - * ``` - */ -export interface AsyncStream extends AsyncIterableIterator { - // - // Intermediate operations - // - - /** - * Returns a new stream that skips elements of this stream not matched by the - * `predicate`. - * - * See also [IteratorHelpers#filter](https://github.com/tc39/proposal-iterator-helpers#filterfiltererfn). - * - * @param predicate a function that decides whether to include each element - * in the new stream (true) or to exclude the element (false) - */ - filter(predicate: (_: T) => boolean): AsyncStream; - - /** - * Returns a new stream that transforms each element of this stream - * using the provided function. - * - * See also [IteratorHelpers#map](https://github.com/tc39/proposal-iterator-helpers#mapmapperfn). - * - * @alias - * @param transform a function to apply to each element of this stream - */ - map(transform: (_: T) => U): AsyncStream; - - /** - * Like `map` but the `transform` is an async function that returns a Promise - * and is awaited before producing the next transformed element. - * - * @param transform an async function to apply to each element of this stream - */ - mapAwait(transform: (_: T) => Promise): AsyncStream; - - /** - * - * See also [IteratorHelpers#flatMap](https://github.com/tc39/proposal-iterator-helpers#flatmapmapperfn). - * - * @param transform - */ - flatMap(transform: (_: T) => AsyncStream): AsyncStream; - - flatMapAwait(transform: (_: T) => Promise>): AsyncStream; - - batch(batchSize: number): AsyncStream; - - /** - * Returns a new stream that produces up to the first `limit` number of - * elements of this stream. - * - * See also [IteratorHelpers#take](https://github.com/tc39/proposal-iterator-helpers#takelimit). - * - * @param limit the maximum number of items to produce - */ - take(limit: number): AsyncStream; - - /** - * An alias for `take`. - * - * @alias take - * @param maxSize the maximum number of items to produce - */ - limit(maxSize: number): AsyncStream; - - /** - * - * See also [IteratorHelpers#drop](https://github.com/tc39/proposal-iterator-helpers#droplimit). - * - * @param n - */ - drop(n: number): AsyncStream; - - /** - * An alias for `drop`. - * - * @alias drop - * @param n the number of elements to skip from the start of this stream - */ - skip(n: number): AsyncStream; - - dropWhile(predicate: (_: T) => boolean): AsyncStream; - - takeWhile(predicate: (_: T) => boolean): AsyncStream; - - peek(observer: (_: T) => void): AsyncStream; - - // - // Terminal operations - // - - /** - * - * See also [IteratorHelpers#forEach](https://github.com/tc39/proposal-iterator-helpers#foreachfn). - * - * @param block - */ - forEach(block: (_: T) => unknown | Promise): Promise; - collect( - container: A, - accumulator: (a: A, t: T) => void, - finisher: (_: A) => R, - ): Promise; - reduceLeft(initial: R, accumulator: (r: R, t: T) => R): Promise; - - /** - * - * See also [IteratorHelpers#reduce](https://github.com/tc39/proposal-iterator-helpers#reducereducer--initialvalue-). - * - * @param accumulator - * @param initial - */ - reduce(accumulator: (a: T, b: T) => T, initial?: T): Promise; - - /** - * Like {@link AsyncStream#reduce()} but returns `undefined` if this stream is - * empty instead of throwing `TypeError`. - * - * @param accumulator - * @param initial - */ - fold(accumulator: (a: T, b: T) => T, initial?: T): Promise; - - /** - * - * See also [IteratorHelpers#every](https://github.com/tc39/proposal-iterator-helpers#everyfn). - * - * @param predicate - */ - every(predicate: (_: T) => boolean): Promise; - - /** - * An alias for `every`. - * - * @param predicate - */ - all(predicate: (_: T) => boolean): Promise; - - /** - * See also [IteratorHelpers#some](https://github.com/tc39/proposal-iterator-helpers#somefn). - * - * @param predicate - */ - some(predicate: (_: T) => boolean): Promise; - - /** - * An alias for `some`. - * - * @param predicate - */ - any(predicate: (_: T) => boolean): Promise; - none(predicate: (_: T) => boolean): Promise; - count(): Promise; - - /** - * An alias for `first`. - * - * See also [IteratorHelpers#find](https://github.com/tc39/proposal-iterator-helpers#findfn). - * - * @param predicate - */ - find(predicate: (_: T) => boolean): Promise; - - first(predicate: (_: T) => boolean): Promise; - last(predicate: (_: T) => boolean): Promise; - max(comparator: (a: T, b: T) => number): Promise; - min(comparator: (a: T, b: T) => number): Promise; - - /** - * See also [IteratorHelpers#toArray](https://github.com/tc39/proposal-iterator-helpers#toarray). - */ - toArray(): Promise; -} - -class AsyncStreamOfIterator - implements AsyncStream, AsyncIterableIterator -{ - constructor(private readonly iterator: AsyncIterator) {} - - // The AsyncIterator protocol - next(...args: [] | [undefined]) { - return this.iterator.next(...args); - } - - readonly return = this.iterator.return - ? (value?: unknown) => { - return this.iterator.return!(value); - } - : undefined; - - readonly throw = this.iterator.throw - ? (e?: unknown) => { - return this.iterator.throw!(e); - } - : undefined; - - // Aliases - readonly limit = this.take; - readonly skip = this.drop; - readonly find = this.first; - readonly all = this.every; - readonly any = this.some; - - /** - * @returns the AsyncIterator wrapped by this AsyncStream - */ - [Symbol.asyncIterator]() { - return this; - } - - filter(predicate: (_: T) => boolean): AsyncStream { - async function* filtered(it: AsyncStream) { - for await (const v of it) { - if (predicate(v)) { - yield v; - } - } - } - return new AsyncStreamOfIterator(filtered(this)); - } - - map(transform: (_: T) => U): AsyncStream { - async function* mapped(it: AsyncStream) { - for await (const v of it) { - yield transform(v); - } - } - return new AsyncStreamOfIterator(mapped(this)); - } - - mapAwait(transform: (_: T) => Promise): AsyncStream { - async function* mapAwaited(it: AsyncStream) { - for await (const v of it) { - yield await transform(v); - } - } - return new AsyncStreamOfIterator(mapAwaited(this)); - } - - flatMap(transform: (_: T) => AsyncStream): AsyncStream { - async function* flatMapped(it: AsyncStream) { - for await (const nested of it) { - yield* transform(nested); - } - } - return new AsyncStreamOfIterator(flatMapped(this)); - } - - flatMapAwait( - transform: (_: T) => Promise>, - ): AsyncStream { - async function* flatMapAwaited(it: AsyncStream) { - for await (const nested of it) { - yield* await transform(nested); - } - } - return new AsyncStreamOfIterator(flatMapAwaited(this)); - } - - batch(batchSize: number): AsyncStream { - if (batchSize < 1) { - throw new Error("batchSize should be positive"); - } - - async function* batched(it: AsyncStream) { - let acc: T[] = []; - for await (const v of it) { - acc.push(v); - if (acc.length === batchSize) { - yield acc; - acc = []; - } - } - if (acc.length > 0) { - yield acc; - } - } - return new AsyncStreamOfIterator(batched(this)); - } - - take(maxSize: number): AsyncStream { - async function* limited(it: AsyncStream) { - let count = 0; - if (count >= maxSize) { - return; - } - for await (const v of it) { - yield v; - count += 1; - if (count >= maxSize) { - return; - } - } - } - return new AsyncStreamOfIterator(limited(this)); - } - - drop(n: number): AsyncStream { - async function* skipped(it: AsyncStream) { - let count = 0; - for await (const v of it) { - if (count >= n) { - yield v; - } - count += 1; - } - } - return new AsyncStreamOfIterator(skipped(this)); - } - - dropWhile(predicate: (_: T) => boolean): AsyncStream { - async function* droppedWhile(it: AsyncStream) { - let dropping = true; - for await (const v of it) { - dropping = dropping && predicate(v); - if (!dropping) { - yield v; - } - } - } - return new AsyncStreamOfIterator(droppedWhile(this)); - } - - takeWhile(predicate: (_: T) => boolean): AsyncStream { - async function* takenWhile(it: AsyncStream) { - for await (const v of it) { - if (!predicate(v)) { - return; - } - yield v; - } - } - return new AsyncStreamOfIterator(takenWhile(this)); - } - - peek(observer: (_: T) => void): AsyncStream { - async function* peeked(it: AsyncStream) { - for await (const v of it) { - observer(v); - yield v; - } - } - return new AsyncStreamOfIterator(peeked(this)); - } - - async forEach(block: (_: T) => unknown | Promise): Promise { - for await (const v of this) { - await block(v); - } - } - - async collect( - container: A, - accumulator: (a: A, t: T) => void, - finisher: (_: A) => R, - ): Promise { - for await (const v of this) { - accumulator(container, v); - } - return finisher(container); - } - - async reduceLeft(initial: R, reducer: (r: R, t: T) => R): Promise { - let result = initial; - for await (const v of this) { - result = reducer(result, v); - } - return result; - } - - async every(predicate: (_: T) => boolean): Promise { - for await (const v of this) { - if (!(await predicate(v))) { - return false; - } - } - return true; - } - - async some(predicate: (_: T) => boolean): Promise { - for await (const v of this) { - if (await predicate(v)) { - return true; - } - } - return false; - } - - async none(predicate: (_: T) => boolean): Promise { - for await (const v of this) { - if (await predicate(v)) { - return false; - } - } - return true; - } - - async count(): Promise { - let count = 0; - for await (const _ of this) { - count += 1; - } - return count; - } - - async first(predicate: (_: T) => boolean): Promise { - for await (const v of this) { - if (await predicate(v)) { - return v; - } - } - return undefined; - } - - async last(predicate: (_: T) => boolean): Promise { - let result: T | undefined; - for await (const v of this) { - if (await predicate(v)) { - result = v; - } - } - return result; - } - - async max(comparator: (a: T, b: T) => number): Promise { - let result: T | undefined; - let firstItem = true; - for await (const v of this) { - if (firstItem) { - result = v; - firstItem = false; - } else { - result = comparator(result!, v) > 0 ? result : v; - } - } - return result; - } - - async min(comparator: (a: T, b: T) => number): Promise { - let result: T | undefined; - let firstItem = true; - for await (const v of this) { - if (firstItem) { - result = v; - firstItem = false; - } else { - result = comparator(result!, v) < 0 ? result : v; - } - } - return result; - } - - async reduce(adder: (a: T, b: T) => T, initial?: T): Promise { - const hasInitial = arguments.length >= 2; - let firstItem = !hasInitial; - let result = initial; - for await (const v of this) { - if (firstItem) { - result = v; - firstItem = false; - } else { - result = adder(result!, v); - } - } - if (firstItem) { - throw new TypeError("reduce without initial value but stream is empty"); - } - return result!; - } - - async fold(adder: (a: T, b: T) => T, initial?: T): Promise { - const hasInitial = arguments.length >= 2; - let firstItem = !hasInitial; - let result = initial; - for await (const v of this) { - if (firstItem) { - result = v; - firstItem = false; - } else { - result = adder(result!, v); - } - } - return result; - } - - async toArray(): Promise { - const result = [] as T[]; - for await (const v of this) { - result.push(v); - } - return result; - } -} - -// Stream Sources - -export function asyncStreamIterable(itrbl: Iterable): AsyncStream { - async function* iterableSource() { - for (const e of itrbl) { - yield e; - } - } - return new AsyncStreamOfIterator(iterableSource()); -} - -export function streamAsyncIterable( - itrbl: AsyncIterable, -): AsyncStream { - return new AsyncStreamOfIterator(itrbl[Symbol.asyncIterator]()); -} - -export function asyncStream( - it: Iterable | Iterator | AsyncIterable | AsyncIterator, -): AsyncStream { - if (typeof it[Symbol.iterator] === "function") { - return asyncStreamIterable(it as Iterable); - } - if (typeof it[Symbol.asyncIterator] === "function") { - return streamAsyncIterable(it as AsyncIterable); - } - return new AsyncStreamOfIterator(it as AsyncIterator); -} +import "./async-iterator-stream"; +import "./iterator-stream"; diff --git a/src/iterator-stream.ts b/src/iterator-stream.ts new file mode 100644 index 0000000..c2f89cd --- /dev/null +++ b/src/iterator-stream.ts @@ -0,0 +1,510 @@ +import { AsyncIteratorStream } from "./async-iterator-stream"; + +/** + * An object containing factory methods for IteratorStream. + * + * ``` + * IteratorStream.from([1, 2, 3]).map(x => x * 2).forEach(console.log); + * ``` + */ +export const IteratorStream = { + from: iteratorStreamFrom, +}; + +/** + * An asynchronous stream that produces elements of type `T` on demand. + * + * This is an extension of the built-in `AsyncIterable` protocol. + * + * The operations defined here in IteratorStream are a superset of the + * operations described in the + * [Iterator Helpers](https://github.com/tc39/proposal-iterator-helpers) + * proposal. + * + * The behavior of all operations here that correspond to operations in the + * `Iterator Helpers` proposal are defined (as best as possible) to match + * exactly with the behavior of the corresponding `Iterator Helpers` + * operations. This is intended to allow users of this library to seamlessly + * switch between this library and the `Iterator Helpers` once those are + * implemented and available. + * + * However, this library is NOT a polyfill for `Iterator Helpers`. To use + * this library, an iterable iterator or generator needs to be explicitly + * wrapped in an IteratorStream like in this example: + * + * ``` + * import "streams/sync"; + * + * function* generator() { + * yield* [1, 2, 3]; + * } + * + * generator() // returns an AsyncGenerator + * .stream() // convert to IteratorStream first! + * .forEach(console.log); // use the IteratorStream APIs + * ``` + */ +export interface IteratorStream extends IterableIterator { + // + // Intermediate operations + // + + /** + * Returns a new stream that skips elements of this stream not matched by the + * `predicate`. + * + * See also [IteratorHelpers#filter](https://github.com/tc39/proposal-iterator-helpers#filterfiltererfn). + * + * @param predicate a function that decides whether to include each element + * in the new stream (true) or to exclude the element (false) + */ + filter(predicate: (_: T) => boolean): IteratorStream; + + /** + * Returns a new stream that transforms each element of this stream + * using the provided function. + * + * See also [IteratorHelpers#map](https://github.com/tc39/proposal-iterator-helpers#mapmapperfn). + * + * @param transform a function to apply to each element of this stream + */ + map(transform: (_: T) => U): IteratorStream; + + /** + * + * See also [IteratorHelpers#flatMap](https://github.com/tc39/proposal-iterator-helpers#flatmapmapperfn). + * + * @param transform + */ + flatMap(transform: (_: T) => Iterable): IteratorStream; + + batch(batchSize: number): IteratorStream; + + /** + * Returns a new stream that produces up to the first `limit` number of + * elements of this stream. + * + * See also [IteratorHelpers#take](https://github.com/tc39/proposal-iterator-helpers#takelimit). + * + * @param limit the maximum number of items to produce + */ + take(limit: number): IteratorStream; + + /** + * + * See also [IteratorHelpers#drop](https://github.com/tc39/proposal-iterator-helpers#droplimit). + * + * @param n + */ + drop(n: number): IteratorStream; + + dropWhile(predicate: (_: T) => boolean): IteratorStream; + + takeWhile(predicate: (_: T) => boolean): IteratorStream; + + peek(observer: (_: T) => void): IteratorStream; + + // + // Terminal operations + // + + /** + * + * See also [IteratorHelpers#forEach](https://github.com/tc39/proposal-iterator-helpers#foreachfn). + * + * @param block + */ + forEach(block: (_: T) => unknown): void; + collect( + container: A, + accumulator: (a: A, t: T) => void, + finisher: (_: A) => R, + ): R; + reduceLeft(initial: R, accumulator: (r: R, t: T) => R): R; + + /** + * + * See also [IteratorHelpers#reduce](https://github.com/tc39/proposal-iterator-helpers#reducereducer--initialvalue-). + * + * @param accumulator + * @param initial + */ + reduce(accumulator: (a: T, b: T) => T, initial?: T): T; + + /** + * Like {@link IteratorStream#reduce()} but returns `undefined` if this stream is + * empty instead of throwing `TypeError`. + * + * @param accumulator + * @param initial + */ + fold(accumulator: (a: T, b: T) => T, initial?: T): T | undefined; + + /** + * + * See also [IteratorHelpers#every](https://github.com/tc39/proposal-iterator-helpers#everyfn). + * + * @param predicate + */ + every(predicate: (_: T) => boolean): boolean; + + /** + * See also [IteratorHelpers#some](https://github.com/tc39/proposal-iterator-helpers#somefn). + * + * @param predicate + */ + some(predicate: (_: T) => boolean): boolean; + + none(predicate: (_: T) => boolean): boolean; + + count(): number; + + /** + * Returns the first element that matches the predicate. + * + * This is the same as the {@link first} method except that the predicate is + * required. + * + * See also [IteratorHelpers#find](https://github.com/tc39/proposal-iterator-helpers#findfn). + * + * @param predicate + */ + find(predicate: (_: T) => boolean): T | undefined; + + first(predicate?: (_: T) => boolean): T | undefined; + last(predicate?: (_: T) => boolean): T | undefined; + max(comparator: (a: T, b: T) => number): T | undefined; + min(comparator: (a: T, b: T) => number): T | undefined; + + /** + * See also [IteratorHelpers#toArray](https://github.com/tc39/proposal-iterator-helpers#toarray). + */ + toArray(): T[]; +} + +// +// IteratorStream Implementation +// + +class IteratorStreamOfIterator + implements IteratorStream, IterableIterator +{ + constructor(private readonly iterator: Iterator) {} + + stream() { + return IteratorStream.from(this); + } + + streamAsync() { + return AsyncIteratorStream.from(this); + } + + // The AsyncIterator protocol + next(...args: [] | [undefined]) { + return this.iterator.next(...args); + } + + readonly return = this.iterator.return + ? (value?: unknown) => { + return this.iterator.return!(value); + } + : undefined; + + readonly throw = this.iterator.throw + ? (e?: unknown) => { + return this.iterator.throw!(e); + } + : undefined; + + [Symbol.iterator]() { + return this; + } + + filter(predicate: (_: T) => boolean): IteratorStream { + function* filtered(it: IteratorStream) { + for (const v of it) { + if (predicate(v)) { + yield v; + } + } + } + return new IteratorStreamOfIterator(filtered(this)); + } + + map(transform: (_: T) => U): IteratorStream { + function* mapped(it: IteratorStream) { + for (const v of it) { + yield transform(v); + } + } + return new IteratorStreamOfIterator(mapped(this)); + } + + flatMap(transform: (_: T) => Iterable): IteratorStream { + function* flatMapped(it: IteratorStream) { + for (const nested of it) { + yield* transform(nested); + } + } + return new IteratorStreamOfIterator(flatMapped(this)); + } + + batch(batchSize: number): IteratorStream { + if (batchSize < 1) { + throw new Error("batchSize should be positive"); + } + + function* batched(it: IteratorStream) { + let acc: T[] = []; + for (const v of it) { + acc.push(v); + if (acc.length === batchSize) { + yield acc; + acc = []; + } + } + if (acc.length > 0) { + yield acc; + } + } + return new IteratorStreamOfIterator(batched(this)); + } + + take(maxSize: number): IteratorStream { + function* limited(it: IteratorStream) { + let count = 0; + if (count >= maxSize) { + return; + } + for (const v of it) { + yield v; + count += 1; + if (count >= maxSize) { + return; + } + } + } + return new IteratorStreamOfIterator(limited(this)); + } + + drop(n: number): IteratorStream { + function* skipped(it: IteratorStream) { + let count = 0; + for (const v of it) { + if (count >= n) { + yield v; + } + count += 1; + } + } + return new IteratorStreamOfIterator(skipped(this)); + } + + dropWhile(predicate: (_: T) => boolean): IteratorStream { + function* droppedWhile(it: IteratorStream) { + let dropping = true; + for (const v of it) { + dropping = dropping && predicate(v); + if (!dropping) { + yield v; + } + } + } + return new IteratorStreamOfIterator(droppedWhile(this)); + } + + takeWhile(predicate: (_: T) => boolean): IteratorStream { + function* takenWhile(it: IteratorStream) { + for (const v of it) { + if (!predicate(v)) { + return; + } + yield v; + } + } + return new IteratorStreamOfIterator(takenWhile(this)); + } + + peek(observer: (_: T) => void): IteratorStream { + function* peeked(it: IteratorStream) { + for (const v of it) { + observer(v); + yield v; + } + } + return new IteratorStreamOfIterator(peeked(this)); + } + + forEach(block: (_: T) => unknown): void { + for (const v of this) { + block(v); + } + } + + collect( + container: A, + accumulator: (a: A, t: T) => void, + finisher: (_: A) => R, + ): R { + for (const v of this) { + accumulator(container, v); + } + return finisher(container); + } + + reduceLeft(initial: R, reducer: (r: R, t: T) => R): R { + let result = initial; + for (const v of this) { + result = reducer(result, v); + } + return result; + } + + every(predicate: (_: T) => boolean): boolean { + for (const v of this) { + if (!predicate(v)) { + return false; + } + } + return true; + } + + some(predicate: (_: T) => boolean): boolean { + for (const v of this) { + if (predicate(v)) { + return true; + } + } + return false; + } + + none(predicate: (_: T) => boolean): boolean { + for (const v of this) { + if (predicate(v)) { + return false; + } + } + return true; + } + + count(): number { + let count = 0; + for (const _ of this) { + count += 1; + } + return count; + } + + find(predicate: (_: T) => boolean): T | undefined { + for (const v of this) { + if (predicate(v)) { + return v; + } + } + return undefined; + } + + first(predicate: (_: T) => boolean = (_) => true): T | undefined { + for (const v of this) { + if (predicate(v)) { + return v; + } + } + return undefined; + } + + last(predicate: (_: T) => boolean = (_) => true): T | undefined { + let result: T | undefined; + for (const v of this) { + if (predicate(v)) { + result = v; + } + } + return result; + } + + max(comparator: (a: T, b: T) => number): T | undefined { + let result: T | undefined; + let firstItem = true; + for (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = comparator(result!, v) > 0 ? result : v; + } + } + return result; + } + + min(comparator: (a: T, b: T) => number): T | undefined { + let result: T | undefined; + let firstItem = true; + for (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = comparator(result!, v) < 0 ? result : v; + } + } + return result; + } + + reduce(adder: (a: T, b: T) => T, initial?: T): T { + const hasInitial = arguments.length >= 2; + let firstItem = !hasInitial; + let result = initial; + for (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = adder(result!, v); + } + } + if (firstItem) { + throw new TypeError("reduce without initial value but stream is empty"); + } + return result!; + } + + fold(adder: (a: T, b: T) => T, initial?: T): T | undefined { + const hasInitial = arguments.length >= 2; + let firstItem = !hasInitial; + let result = initial; + for (const v of this) { + if (firstItem) { + result = v; + firstItem = false; + } else { + result = adder(result!, v); + } + } + return result; + } + + toArray(): T[] { + const result = [] as T[]; + for (const v of this) { + result.push(v); + } + return result; + } +} + +// +// IteratorStream Factory +// + +function iteratorStreamFrom( + it: Iterable | Iterator, +): IteratorStream { + if (typeof it[Symbol.asyncIterator] === "function") { + return new IteratorStreamOfIterator(it[Symbol.asyncIterator]()); + } + if (typeof it[Symbol.iterator] === "function") { + return new IteratorStreamOfIterator(it[Symbol.iterator]()); + } + return new IteratorStreamOfIterator(it as Iterator); +} diff --git a/src/polyfill/array.ts b/src/polyfill/array.ts deleted file mode 100644 index 539c82c..0000000 --- a/src/polyfill/array.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface Array { - asyncStream(): AsyncStream; - } -} - -Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; diff --git a/src/polyfill/asyncGenerator.test.ts b/src/polyfill/asyncGenerator.test.ts deleted file mode 100644 index ec44b0b..0000000 --- a/src/polyfill/asyncGenerator.test.ts +++ /dev/null @@ -1,14 +0,0 @@ -import "./asyncGenerator"; - -test("AsyncGenerator.stream polyfill works", async () => { - async function* gen() { - yield* [1, 2, 3]; - } - - const r = await gen() - .stream() - .map((x) => x * 2) - .toArray(); - - expect(r).toEqual([2, 4, 6]); -}); diff --git a/src/polyfill/asyncGenerator.ts b/src/polyfill/asyncGenerator.ts deleted file mode 100644 index b604f6d..0000000 --- a/src/polyfill/asyncGenerator.ts +++ /dev/null @@ -1,15 +0,0 @@ -import { AsyncStream, streamAsyncIterable } from "../index"; - -declare global { - interface AsyncGenerator { - stream(): AsyncStream; - } -} - -const AsyncGeneratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf((async function* () {})()), -); - -AsyncGeneratorPrototype.stream = function () { - return streamAsyncIterable(this); -}; diff --git a/src/polyfill/generator.test.ts b/src/polyfill/generator.test.ts deleted file mode 100644 index 190ee47..0000000 --- a/src/polyfill/generator.test.ts +++ /dev/null @@ -1,64 +0,0 @@ -import "./generator"; - -test("Generator.stream polyfill works with generator functions", async () => { - function* gen() { - yield* [1, 2, 3]; - } - - const r = await gen() - .asyncStream() - .map((x) => x * 2) - .toArray(); - - expect(r).toEqual([2, 4, 6]); -}); - -test("Generator.stream polyfill works with Array.values()", async () => { - const gen = [1, 2, 3].values(); - - const r = await gen - .asyncStream() - .map((x) => x * 2) - .toArray(); - - expect(r).toEqual([2, 4, 6]); -}); - -test("Generator.stream polyfill works with Map.keys()", async () => { - const map = new Map(); - map.set(1, "a"); - map.set(2, "b"); - map.set(3, "c"); - const gen = map.keys(); - - const r = await gen - .asyncStream() - .map((x) => x * 2) - .toArray(); - - expect(r).toEqual([2, 4, 6]); -}); - -test("Generator.stream polyfill works with Map.values()", async () => { - const map = new Map(); - map.set(1, "a"); - map.set(2, "b"); - map.set(3, "c"); - const gen = map.values(); - - const r = await gen - .asyncStream() - .map((x) => x.toUpperCase()) - .toArray(); - - expect(r).toEqual(["A", "B", "C"]); -}); - -test("the Array prototype is untouched by the polyfill", async () => { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const arr = [1, 2, 3] as any; - - const f = () => arr.stream(); - - expect(f).toThrow(TypeError); -}); diff --git a/src/polyfill/generator.ts b/src/polyfill/generator.ts deleted file mode 100644 index 8bc7bf6..0000000 --- a/src/polyfill/generator.ts +++ /dev/null @@ -1,27 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface Generator { - asyncStream(): AsyncStream; - } - - interface IterableIterator { - asyncStream(): AsyncStream; - } -} - -const GeneratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf((function* () {})()), -); - -const IterableIteratorPrototype = Object.getPrototypeOf( - Object.getPrototypeOf([].values()), -); - -GeneratorPrototype.asyncStream = function () { - return asyncStreamIterable(this); -}; - -IterableIteratorPrototype.asyncStream = function () { - return asyncStreamIterable(this); -}; diff --git a/src/polyfill/index.ts b/src/polyfill/index.ts deleted file mode 100644 index 0d779bd..0000000 --- a/src/polyfill/index.ts +++ /dev/null @@ -1,6 +0,0 @@ -import "./array"; -import "./map"; -import "./readable"; -import "./set"; -import "./string"; -import "./typedArray"; diff --git a/src/polyfill/map.ts b/src/polyfill/map.ts deleted file mode 100644 index 97837a6..0000000 --- a/src/polyfill/map.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface Map { - asyncStream(): AsyncStream<[K, V]>; - asyncStreamKeys(): AsyncStream; - asyncStreamValues(): AsyncStream; - } -} - -Map.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; - -Map.prototype.asyncStreamKeys = function () { - return asyncStreamIterable(this.keys()); -}; - -Map.prototype.asyncStreamValues = function () { - return asyncStreamIterable(this.values()); -}; diff --git a/src/polyfill/readable.ts b/src/polyfill/readable.ts deleted file mode 100644 index 5c7f96f..0000000 --- a/src/polyfill/readable.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { Readable, Duplex } from "stream"; -import { AsyncStream } from "../index"; -import { streamLines } from "../lines"; - -declare module "stream" { - interface Readable { - streamLines(encoding?: BufferEncoding): AsyncStream; - } - interface Duplex { - streamLines(encoding?: BufferEncoding): AsyncStream; - } -} - -Readable.prototype.streamLines = function (encoding?: BufferEncoding) { - return streamLines(this, encoding); -}; - -Duplex.prototype.streamLines = function (encoding?: BufferEncoding) { - return streamLines(this, encoding); -}; diff --git a/src/polyfill/set.ts b/src/polyfill/set.ts deleted file mode 100644 index 33dc46c..0000000 --- a/src/polyfill/set.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface Set { - asyncStream(): AsyncStream; - } -} - -Set.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; diff --git a/src/polyfill/string.ts b/src/polyfill/string.ts deleted file mode 100644 index b2eb4d1..0000000 --- a/src/polyfill/string.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface String { - charAsyncStream(): AsyncStream; - } -} - -String.prototype.charAsyncStream = function () { - return asyncStreamIterable(this); -}; diff --git a/src/polyfill/typedArray.ts b/src/polyfill/typedArray.ts deleted file mode 100644 index 5822a8a..0000000 --- a/src/polyfill/typedArray.ts +++ /dev/null @@ -1,59 +0,0 @@ -import { AsyncStream, asyncStreamIterable } from "../index"; - -declare global { - interface Int8Array { - asyncStream(): AsyncStream; - } - interface Uint8Array { - asyncStream(): AsyncStream; - } - interface Uint8ClampedArray { - asyncStream(): AsyncStream; - } - interface Int16Array { - asyncStream(): AsyncStream; - } - interface Uint16Array { - asyncStream(): AsyncStream; - } - interface Int32Array { - asyncStream(): AsyncStream; - } - interface Uint32Array { - asyncStream(): AsyncStream; - } - interface Float32Array { - asyncStream(): AsyncStream; - } - interface Float64Array { - asyncStream(): AsyncStream; - } -} - -Int8Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Uint8Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Uint8ClampedArray.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Int16Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Uint16Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Int32Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Uint32Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Float32Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; -Float64Array.prototype.asyncStream = function () { - return asyncStreamIterable(this); -}; diff --git a/src/random.ts b/src/random.ts deleted file mode 100644 index d6ecb5c..0000000 --- a/src/random.ts +++ /dev/null @@ -1,11 +0,0 @@ -import { AsyncStream, asyncStream } from "./index"; - -export function streamRandomBytes(): AsyncStream { - async function* randomSource() { - while (true) { - yield Math.random(); - } - } - - return asyncStream(randomSource()); -} diff --git a/src/range.ts b/src/range.ts deleted file mode 100644 index cde864f..0000000 --- a/src/range.ts +++ /dev/null @@ -1,25 +0,0 @@ -import { AsyncStream, asyncStream } from "./index"; - -export function streamRange( - startInclusive: number, - endExclusive: number, -): AsyncStream { - async function* rangeSource() { - for (let i = startInclusive; i < endExclusive; i++) { - yield i; - } - } - return asyncStream(rangeSource()); -} - -export function streamClosedRange( - startInclusive: number, - endInclusive: number, -): AsyncStream { - async function* rangeSource() { - for (let i = startInclusive; i <= endInclusive; i++) { - yield i; - } - } - return asyncStream(rangeSource()); -} diff --git a/src/readableAsyncIterator.ts b/src/readableAsyncIterator.ts deleted file mode 100644 index ac4744c..0000000 --- a/src/readableAsyncIterator.ts +++ /dev/null @@ -1,85 +0,0 @@ -import { Readable } from "stream"; - -function makePipe() { - const puts: T[] = []; - const takes: ((v: IteratorResult) => void)[] = []; - let closed = false; - - function put(n: T) { - if (closed) { - return; - } - const nextInLine = takes.shift(); - if (nextInLine) { - nextInLine({ done: false, value: n }); - } else { - puts.push(n); - } - } - - function close() { - closed = true; - } - - function next(): Promise> { - const next = puts.shift(); - if (next) { - return Promise.resolve({ done: false, value: next }); - } - if (closed) { - return Promise.resolve({ done: true, value: undefined }); - } - - return new Promise>((resolve) => takes.push(resolve)); - } - - return { put, next, close }; -} - -export interface Splitter { - initial(): R; - split(chunk: B, previous: R): [T[], R]; - last(remainder: R): T | null; -} - -export function readableAsyncIterator( - readable: Readable, - by: Splitter, -): AsyncIterator { - const { next, ...pipe } = makePipe(); - - let remainder = by.initial(); - - const dataListener = (chunk: Buffer) => { - const [items, nextRemainder] = by.split(chunk, remainder); - - items.forEach(pipe.put); - - remainder = nextRemainder; - }; - - const endListener = () => { - const lastItem = by.last(remainder); - if (lastItem !== null) { - pipe.put(lastItem); - } - - cleanUp(); - }; - - readable.on("data", dataListener); - readable.on("end", endListener); - - function cleanUp() { - pipe.close(); - readable.removeListener("data", dataListener); - readable.removeListener("end", endListener); - } - - function iteratorReturn(): Promise> { - cleanUp(); - return Promise.resolve({ done: true, value: undefined }); - } - - return { next, return: iteratorReturn }; -} diff --git a/src/repeated.ts b/src/repeated.ts deleted file mode 100644 index c763172..0000000 --- a/src/repeated.ts +++ /dev/null @@ -1,10 +0,0 @@ -import { AsyncStream, asyncStream } from "./index"; - -export function streamRepeated(value: T, count: number): AsyncStream { - async function* repeatedSource() { - for (let i = 0; i < count; i++) { - yield value; - } - } - return asyncStream(repeatedSource()); -} diff --git a/src/secureRandom.ts b/src/secureRandom.ts deleted file mode 100644 index 902f9a8..0000000 --- a/src/secureRandom.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { AsyncStream, asyncStream } from "./index"; -import { promisify } from "util"; -import { randomBytes as randomBytesCb } from "crypto"; - -const randomBytes = promisify(randomBytesCb); - -export function streamSecureRandomBytes( - bufferSize: number = 64, -): AsyncStream { - async function* secureRandomSource() { - while (true) { - const data = await randomBytes(bufferSize); - for (const byte of data) { - yield byte; - } - } - } - - return asyncStream(secureRandomSource()); -} diff --git a/src/sources/async-generator.ts b/src/sources/async-generator.ts new file mode 100644 index 0000000..c49b498 --- /dev/null +++ b/src/sources/async-generator.ts @@ -0,0 +1,27 @@ +export function makeAsyncGenerator< + T = unknown, + TReturn = unknown, + TNext = undefined, +>( + start: () => void, + next: (...args: [] | [TNext]) => Promise>, + stop: () => TReturn, +): AsyncGenerator { + async function doStop(): Promise> { + const value = stop(); + return { done: true, value }; + } + const iterator: AsyncIterator = { + next, + return: doStop, + throw: doStop, + }; + const iterable = { [Symbol.asyncIterator]: () => iterator }; + async function* generator(): AsyncGenerator { + start(); + + return yield* iterable; + } + + return generator(); +} diff --git a/src/sources/event.ts b/src/sources/event.ts new file mode 100644 index 0000000..6c3d236 --- /dev/null +++ b/src/sources/event.ts @@ -0,0 +1,78 @@ +import { makePipe } from "../util/pipe"; +import { makeAsyncGenerator } from "./async-generator"; + +type EventHandler = (event: T) => unknown; + +type EventEmitter = { + // jQuery and Node.js NodeEventTarget & EventEmitter + // https://api.jquery.com/on/ + // https://nodejs.org/api/events.html + on?(eventName: string | symbol, listener: EventHandler): unknown; + off?(eventName: string | symbol, listener: EventHandler): unknown; + + // Node.js NodeEventTarget & EventEmitter + // https://nodejs.org/api/events.html + addListener?(eventName: string | symbol, listener: EventHandler): unknown; + removeListener?( + eventName: string | symbol, + listener: EventHandler, + ): unknown; + + // DOM EventTarget and Node.js EventTarget + // https://developer.mozilla.org/en-US/docs/Web/API/EventTarget/addEventListener + // https://nodejs.org/api/events.html + addEventListener?( + eventName: string | symbol, + listener: EventHandler, + options?: object, + ): unknown; + removeEventListener?( + eventName: string | symbol, + listener: EventHandler, + options?: object, + ): unknown; +}; + +export function fromEvent( + target: EventEmitter, + eventName: string | symbol, + options?: object, +): AsyncGenerator { + if (typeof target.on === "function") { + return fromEventPattern( + (handler) => target.on!(eventName, handler), + (handler) => target.off?.(eventName, handler), + ); + } else if (typeof target.addListener === "function") { + return fromEventPattern( + (handler) => target.addListener!(eventName, handler), + (handler) => target.removeListener?.(eventName, handler), + ); + } else if (typeof target.addEventListener === "function") { + return fromEventPattern( + (handler) => target.addEventListener!(eventName, handler, options), + (handler) => target.removeEventListener?.(eventName, handler, options), + ); + } + throw new TypeError("unsupported event target"); +} + +export function fromEventPattern( + addHandler: (handler: EventHandler) => unknown, + removeHandler?: (handler: EventHandler) => void, +): AsyncGenerator { + const { next, put, close } = makePipe(); + + const handler: EventHandler = (value) => put({ value }); + + function start() { + addHandler(handler); + } + + function stop() { + close(); + removeHandler?.(handler); + } + + return makeAsyncGenerator(start, next, stop); +} diff --git a/src/sources/interval.ts b/src/sources/interval.ts new file mode 100644 index 0000000..29a4a96 --- /dev/null +++ b/src/sources/interval.ts @@ -0,0 +1,39 @@ +import { makePipe } from "../util/pipe"; +import { makeAsyncGenerator } from "./async-generator"; + +export interface IntervalScheduler { + schedule(interval: number, listener: () => unknown): T; + cancel(scheduled: T): void; +} + +export const defaultScheduler = { + schedule(interval: number, listener: () => unknown) { + return setInterval(listener, interval); + }, + cancel(scheduled) { + clearInterval(scheduled as number); + }, +} as IntervalScheduler; + +export function iteratorInterval( + periodMillis: number, + scheduler: IntervalScheduler = defaultScheduler, +): AsyncGenerator { + const { next, ...pipe } = makePipe(); + + let timer; + + function start() { + let n = 0; + timer = scheduler.schedule(periodMillis, () => { + pipe.put({ value: n++ }); + }); + } + + function stop() { + pipe.close(); + scheduler.cancel(timer); + } + + return makeAsyncGenerator(start, next, stop); +} diff --git a/src/sources/random.ts b/src/sources/random.ts new file mode 100644 index 0000000..16db10d --- /dev/null +++ b/src/sources/random.ts @@ -0,0 +1,9 @@ +export function iteratorRandom(): Generator { + function* randomSource() { + while (true) { + yield Math.random(); + } + } + + return randomSource(); +} diff --git a/src/sources/range.test.ts b/src/sources/range.test.ts new file mode 100644 index 0000000..92c013d --- /dev/null +++ b/src/sources/range.test.ts @@ -0,0 +1,97 @@ +import { iteratorRange } from "./range"; + +test("streamRange with step 0 and start != end throws TypeError", () => { + const f = () => iteratorRange(1, 2, 0); + + expect(f).toThrow(TypeError); +}); + +test("streamRange with step 0 and start == end is empty", () => { + const f = () => iteratorRange(1, 1, 0); + + expect(Array.from(f())).toEqual([]); +}); + +test("streamRange with step 0 and start == end (inclusive) has 1 element", () => { + const f = iteratorRange(1, 1, { step: 0, inclusive: true }); + + expect(Array.from(f)).toEqual([1]); +}); + +test("streamRange with start == end is empty", () => { + const f = () => iteratorRange(1, 1); + + expect(Array.from(f())).toEqual([]); +}); + +test("streamRange with start == end and step 1 is empty", () => { + const f = () => iteratorRange(1, 1, 1); + + expect(Array.from(f())).toEqual([]); +}); + +test("streamRange with start == end and step -1 is empty", () => { + const f = () => iteratorRange(1, 1, -1); + + expect(Array.from(f())).toEqual([]); +}); + +test("streamRange with start == end and step 1 inclusive has 1 element", () => { + const f = () => iteratorRange(1, 1, { inclusive: true, step: 1 }); + + expect(Array.from(f())).toEqual([1]); +}); + +test("streamRange with start == end and step -1 inclusive has 1 element", () => { + const f = () => iteratorRange(1, 1, { inclusive: true, step: -1 }); + + expect(Array.from(f())).toEqual([1]); +}); + +test("streamRange with step 1 and start > end throws TypeError", () => { + const f = () => iteratorRange(2, 1, 1); + + expect(f).toThrow(TypeError); +}); + +test("streamRange from 1 to 3 step -0.5 throws TypeError", () => { + const f = () => iteratorRange(1, 3, -0.5); + + expect(f).toThrow(TypeError); +}); + +test("streamRange from 1 to 3 works", () => { + const f = iteratorRange(1, 3); + + expect(Array.from(f)).toEqual([1, 2]); +}); + +test("streamRange from 1 to 3 works", () => { + const f = iteratorRange(1, 3); + + expect(Array.from(f)).toEqual([1, 2]); +}); + +test("streamRange from 1 to 3 inclusive works", () => { + const f = iteratorRange(1, 3, { inclusive: true }); + + expect(Array.from(f)).toEqual([1, 2, 3]); +}); + +test("streamRange from 3 to 1 works", () => { + const f = iteratorRange(3, 1); + + expect(Array.from(f)).toEqual([3, 2]); +}); + +test("streamRange from 3 to 1 inclusive works", () => { + const f = iteratorRange(3, 1, { inclusive: true }); + + expect(Array.from(f)).toEqual([3, 2, 1]); +}); + +test("streamRange from 1 to 3 step 0.5 works", () => { + const f = iteratorRange(1, 3, 0.5); + + expect(Array.from(f)).toEqual([1, 1.5, 2, 2.5]); +}); diff --git a/src/sources/range.ts b/src/sources/range.ts new file mode 100644 index 0000000..2cc2fcc --- /dev/null +++ b/src/sources/range.ts @@ -0,0 +1,39 @@ +export function iteratorRange( + start: number, + end: number, + options?: { step?: number; inclusive?: boolean } | number, +): Generator { + const { step = Math.sign(end - start), inclusive = false } = + typeof options === "undefined" + ? { step: Math.sign(end - start) } + : typeof options === "number" + ? { step: options } + : options; + if (end != start) { + if (Math.sign(step) != Math.sign(end - start)) { + throw new TypeError("step must be in the direction from start to end"); + } + } + + function* rangeSource() { + if (step == 0 && inclusive) { + yield end; + return; + } + + const withinRange = + end > start || step > 0 + ? inclusive + ? (i) => i <= end + : (i) => i < end + : inclusive + ? (i) => i >= end + : (i) => i > end; + + for (let i = start; withinRange(i); i += step) { + yield i; + } + } + + return rangeSource(); +} diff --git a/src/sources/readable-chunks.ts b/src/sources/readable-chunks.ts new file mode 100644 index 0000000..8b1c775 --- /dev/null +++ b/src/sources/readable-chunks.ts @@ -0,0 +1,50 @@ +import { Readable, Writable } from "node:stream"; +import { makePipe } from "../util/pipe"; +import { makeAsyncGenerator } from "./async-generator"; + +export function readableChunks( + readable: Readable, + encoding?: BufferEncoding, +): AsyncGenerator { + const { next, ...pipe } = makePipe(); + + const writable = new Writable({ + defaultEncoding: encoding, + write(chunk: Buffer, _encoding, callback) { + const put = pipe.put({ value: chunk }); + + // Apply back-pressure by awaiting consumption of produced items... + put.then(() => callback()); + }, + }); + + const errorListener = (error) => { + pipe.put({ error }); + stop(); + }; + + const endListener = () => { + // Allow consumers to read any buffered data + stop(); + }; + + function start() { + // Use pipe instead of on("data") because unpipe() releases the stream, + // putting the stream in paused mode. + // See https://nodejs.org/api/stream.html#two-reading-modes + readable.pipe(writable); + readable.on("end", endListener); + readable.on("close", endListener); + readable.on("error", errorListener); + } + + function stop() { + pipe.close(); + readable.unpipe(writable); + readable.off("end", endListener); + readable.off("close", endListener); + readable.off("error", errorListener); + } + + return makeAsyncGenerator(start, next, stop); +} diff --git a/src/lines.ts b/src/sources/readable-lines.ts similarity index 60% rename from src/lines.ts rename to src/sources/readable-lines.ts index f3a6e3c..d530ab7 100644 --- a/src/lines.ts +++ b/src/sources/readable-lines.ts @@ -1,6 +1,5 @@ -import { Readable } from "stream"; -import { AsyncStream, asyncStream } from "./index"; -import { Splitter, readableAsyncIterator } from "./readableAsyncIterator"; +import { Readable } from "node:stream"; +import { Splitter, readableSplit } from "./readable-split"; function stringSplitter( encoding?: BufferEncoding, @@ -25,9 +24,10 @@ function stringSplitter( }; } -export function streamLines( +export function readableLines( readable: Readable, - encoding?: BufferEncoding, -): AsyncStream { - return asyncStream(readableAsyncIterator(readable, stringSplitter(encoding))); + encoding: BufferEncoding = "utf-8", +): AsyncGenerator { + readable.setEncoding(encoding); + return readableSplit(readable, stringSplitter(encoding), encoding); } diff --git a/src/sources/readable-lines2.ts b/src/sources/readable-lines2.ts new file mode 100644 index 0000000..5cb17a1 --- /dev/null +++ b/src/sources/readable-lines2.ts @@ -0,0 +1,17 @@ +import { Readable } from "node:stream"; +import { createInterface } from "node:readline"; + +export async function* readableLines( + readable: Readable, + encoding: BufferEncoding = "utf-8", +): AsyncGenerator { + readable.setEncoding(encoding); + const rl = createInterface({ + input: readable, + }); + try { + yield* rl; + } finally { + rl.close(); + } +} diff --git a/src/sources/readable-split.ts b/src/sources/readable-split.ts new file mode 100644 index 0000000..96efe35 --- /dev/null +++ b/src/sources/readable-split.ts @@ -0,0 +1,26 @@ +import { Readable } from "node:stream"; +import { readableChunks } from "./readable-chunks"; + +export interface Splitter { + initial(): R; + split(chunk: B, previous: R): [T[], R]; + last(remainder: R): T | null; +} + +export async function* readableSplit( + readable: Readable, + by: Splitter, + encoding?: BufferEncoding, +): AsyncGenerator { + let remainder = by.initial(); + for await (const chunk of readableChunks(readable, encoding)) { + const [items, nextRemainder] = by.split(chunk, remainder); + yield* items; + remainder = nextRemainder; + } + + const lastItem = by.last(remainder); + if (lastItem !== null) { + yield lastItem; + } +} diff --git a/src/sources/repeated.ts b/src/sources/repeated.ts new file mode 100644 index 0000000..4059106 --- /dev/null +++ b/src/sources/repeated.ts @@ -0,0 +1,11 @@ +export function iteratorRepeat( + value: T, + count: number = Infinity, +): Generator { + function* repeatedSource() { + for (let i = 0; i < count; i++) { + yield value; + } + } + return repeatedSource(); +} diff --git a/src/sources/secure-random.test.ts b/src/sources/secure-random.test.ts new file mode 100644 index 0000000..3c30eae --- /dev/null +++ b/src/sources/secure-random.test.ts @@ -0,0 +1,10 @@ +import "../factories/async-iterator"; +import { iteratorSecureRandom } from "./secure-random"; + +test("streamSecureRandom generates 20 bytes", async () => { + const gen = iteratorSecureRandom(16); + + const result = await gen.stream().take(20).toArray(); + + expect(result).toHaveLength(20); +}); diff --git a/src/sources/secure-random.ts b/src/sources/secure-random.ts new file mode 100644 index 0000000..b51f79f --- /dev/null +++ b/src/sources/secure-random.ts @@ -0,0 +1,17 @@ +import { promisify } from "node:util"; +import { randomBytes as randomBytesCb } from "node:crypto"; + +const randomBytes = promisify(randomBytesCb); + +export function iteratorSecureRandom( + bufferSize: number = 64, +): AsyncGenerator { + async function* secureRandomSource() { + while (true) { + const data = await randomBytes(bufferSize); + yield* data; + } + } + + return secureRandomSource(); +} diff --git a/src/test_cli.ts b/src/test_cli.ts deleted file mode 100644 index cd8da58..0000000 --- a/src/test_cli.ts +++ /dev/null @@ -1,27 +0,0 @@ -import "./polyfill"; - -process.stdin - .streamLines() - .map((s) => s.toLocaleUpperCase()) - .filter((s) => s.length > 3) - .batch(3) - .limit(2) - .forEach(console.log) - .then((_) => console.log("-- all done --")); - -process.stdin - .streamLines() - .map((s) => s.toLocaleUpperCase()) - .limit(2) - .filter((s) => s.length > 0) - .peek((s) => { - if (s.length > 5) throw "too long!"; - }) - .max((a, b) => a.length - b.length) - .then(console.log) - .catch(console.error); - -process.stdin - .streamLines() - .map((s) => s.substr(0, 2).toLocaleLowerCase()) - .forEach(console.log); diff --git a/src/util/pipe.ts b/src/util/pipe.ts new file mode 100644 index 0000000..278c1ce --- /dev/null +++ b/src/util/pipe.ts @@ -0,0 +1,60 @@ +export function makePipe() { + type Message = { value?: T; error?: Error }; + type Resolver = { + resolve: (v: X) => void; + reject: (e?: Error) => void; + }; + + const puts: [Message, () => void][] = []; + const takes: Resolver>[] = []; + let closed = false; + + function put(message: Message) { + if (closed) { + return Promise.resolve(); + } + const nextInLine = takes.shift(); + + if (nextInLine) { + const { value, error } = message; + if (value !== undefined) { + nextInLine.resolve({ done: false, value }); + } else { + nextInLine.reject(error!); + } + return Promise.resolve(); + } else { + return new Promise((resolve) => { + puts.push([message, resolve]); + }); + } + } + + function close() { + closed = true; + takes.forEach((next) => next.resolve({ done: true, value: undefined })); + takes.length = 0; + } + + function next(): Promise> { + const [next, resolveNext] = puts.shift() ?? []; + if (next) { + resolveNext!(); + const { value, error } = next; + if (value !== undefined) { + return Promise.resolve({ done: false, value }); + } else { + return Promise.reject(error); + } + } + if (closed) { + return Promise.resolve({ done: true, value: undefined }); + } + + return new Promise>((resolve, reject) => + takes.push({ resolve, reject }), + ); + } + + return { put, next, close }; +} diff --git a/tsconfig.json b/tsconfig.json index 658f2f1..dd867fd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -14,5 +14,5 @@ "outDir": "." }, "include": ["src/**/*"], - "exclude": ["node_modules"] + "exclude": ["src/**/*.test.ts", "node_modules"] }