Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: allow stream to stay open after take #47023

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2437,6 +2437,9 @@ added:
* `options` {Object}
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* `destroyStream` {boolean} When set to `false`, the stream will not be
closed after take is finished unless the stream had an error.
**Default:** `true`.
* Returns: {Readable} a stream with `limit` chunks taken.

This method returns a new stream with the first `limit` chunks.
Expand All @@ -2447,6 +2450,26 @@ import { Readable } from 'node:stream';
await Readable.from([1, 2, 3, 4]).take(2).toArray(); // [1, 2]
```

Using the `destroyStream: false` option prevents `take` from closing the stream,
so that the remaining stream data can be consumed later on

```mjs
import fs from 'node:fs';
import { Readable } from 'node:stream';

const csvParsedStream = fs
.createReadStream('file.csv')
.compose(myAwesomeParseCSV());

const [columns] = await csvParsedStream
.take(1)
.toArray();

const parsed = await csvParsedStream
.map((row) => parseRowByColumns(row, columns))
.toArray();
```

##### `readable.asIndexedPairs([options])`

<!-- YAML
Expand Down
31 changes: 24 additions & 7 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const {
validateAbortSignal,
validateInteger,
validateObject,
validateBoolean,
} = require('internal/validators');
const { kWeakHandler } = require('internal/event_target');
const { finished } = require('internal/streams/end-of-stream');
Expand Down Expand Up @@ -397,21 +398,37 @@ function take(number, options = undefined) {
if (options?.signal != null) {
validateAbortSignal(options.signal, 'options.signal');
}
if (options?.destroyStream != null) {
validateBoolean(options.destroyStream, 'options.destroyStream');
}
Comment on lines +401 to +403
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe another name would be better? Not sure what though... @benjamingr


number = toIntegerOrInfinity(number);
return async function* take() {
if (options?.signal?.aborted) {
throw new AbortError();
}
for await (const val of this) {
if (options?.signal?.aborted) {
throw new AbortError();
try {
for await (const val of this.iterator({ destroyOnReturn: options?.destroyStream ?? true })) {
if (options?.signal?.aborted) {
throw new AbortError();
}
if (number-- > 0) {
yield val;

// Don't get another item from iterator in case we reached the end
if (number === 0) {
return;
}
} else {
return;
}
}
if (number-- > 0) {
yield val;
} else {
return;
} catch (e) {
if (!this.destroyed) {
this.destroy(e);
}

throw e;
}
}.call(this);
}
Expand Down
51 changes: 50 additions & 1 deletion test/parallel/test-stream-drop-take.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const common = require('../common');
const {
Readable,
} = require('stream');
const { deepStrictEqual, rejects, throws } = require('assert');
const { deepStrictEqual, rejects, throws, strictEqual } = require('assert');

const { from } = Readable;

Expand Down Expand Up @@ -100,3 +100,52 @@ const naturals = () => from(async function*() {
throws(() => Readable.from([1]).take(1, 1), /ERR_INVALID_ARG_TYPE/);
throws(() => Readable.from([1]).take(1, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}

{
(async () => {
const streamShouldCloseWithoutOption = from([1, 2, 3, 4, 5]);

// Close stream by default
await streamShouldCloseWithoutOption.take(2).toArray();
strictEqual(streamShouldCloseWithoutOption.destroyed, true);
})().then(common.mustCall());
}

{
(async () => {
const streamShouldCloseWithOption = from([1, 2, 3, 4, 5]);

await streamShouldCloseWithOption.take(2, { destroyStream: true }).toArray();
strictEqual(streamShouldCloseWithOption.destroyed, true);
})().then(common.mustCall());
}

{
(async () => {
const streamShouldNotClose = from([1, 2, 3, 4, 5]);

// Do not close stream
await streamShouldNotClose.take(2, { destroyStream: false }).toArray();
strictEqual(streamShouldNotClose.destroyed, false);

deepStrictEqual(await streamShouldNotClose.toArray(), [3, 4, 5]);
strictEqual(streamShouldNotClose.destroyed, true);
})().then(common.mustCall());
}

{
const errorToThrow = new Error('should close');

const streamShouldNotClose = from((function *() {
yield 1;
throw errorToThrow;
})());

streamShouldNotClose.take(3, { destroyStream: false })
.toArray()
.then(common.mustNotCall())
.catch(common.mustCall((error) => {
strictEqual(streamShouldNotClose.destroyed, true);
strictEqual(error, errorToThrow);
}));
}