Skip to content

Commit

Permalink
stream: add highWaterMark for the map operator
Browse files Browse the repository at this point in the history
this is done so we don't wait for the first items to
finish before starting new ones

Fixes: #46132

Co-authored-by: Robert Nagy <[email protected]>
PR-URL: #49249
Reviewed-By: Matteo Collina <[email protected]>
Reviewed-By: Benjamin Gruenbaum <[email protected]>
Reviewed-By: Robert Nagy <[email protected]>
  • Loading branch information
2 people authored and targos committed Nov 26, 2023
1 parent 3f771ca commit 651e450
Show file tree
Hide file tree
Showing 4 changed files with 226 additions and 16 deletions.
12 changes: 12 additions & 0 deletions doc/api/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -2008,6 +2008,10 @@ showBoth();
added:
- v17.4.0
- v16.14.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/49249
description: added `highWaterMark` in options.
-->

> Stability: 1 - Experimental
Expand All @@ -2021,6 +2025,8 @@ added:
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `highWaterMark` {number} how many items to buffer while waiting for user
consumption of the mapped items. **Default:** `concurrency * 2 - 1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream mapped with the function `fn`.
Expand Down Expand Up @@ -2055,6 +2061,10 @@ for await (const result of dnsResults) {
added:
- v17.4.0
- v16.14.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/49249
description: added `highWaterMark` in options.
-->

> Stability: 1 - Experimental
Expand All @@ -2067,6 +2077,8 @@ added:
* `options` {Object}
* `concurrency` {number} the maximum concurrent invocation of `fn` to call
on the stream at once. **Default:** `1`.
* `highWaterMark` {number} how many items to buffer while waiting for user
consumption of the filtered items. **Default:** `concurrency * 2 - 1`.
* `signal` {AbortSignal} allows destroying the stream if the signal is
aborted.
* Returns: {Readable} a stream filtered with the predicate `fn`.
Expand Down
55 changes: 41 additions & 14 deletions lib/internal/streams/operators.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ const {
NumberIsNaN,
Promise,
PromiseReject,
PromiseResolve,
PromisePrototypeThen,
Symbol,
} = primordials;
Expand Down Expand Up @@ -82,7 +83,15 @@ function map(fn, options) {
concurrency = MathFloor(options.concurrency);
}

validateInteger(concurrency, 'concurrency', 1);
let highWaterMark = concurrency - 1;
if (options?.highWaterMark != null) {
highWaterMark = MathFloor(options.highWaterMark);
}

validateInteger(concurrency, 'options.concurrency', 1);
validateInteger(highWaterMark, 'options.highWaterMark', 0);

highWaterMark += concurrency;

return async function* map() {
const signal = AbortSignal.any([options?.signal].filter(Boolean));
Expand All @@ -93,9 +102,28 @@ function map(fn, options) {
let next;
let resume;
let done = false;
let cnt = 0;

function onDone() {
function onCatch() {
done = true;
afterItemProcessed();
}

function afterItemProcessed() {
cnt -= 1;
maybeResume();
}

function maybeResume() {
if (
resume &&
!done &&
cnt < concurrency &&
queue.length < highWaterMark
) {
resume();
resume = null;
}
}

async function pump() {
Expand All @@ -111,25 +139,27 @@ function map(fn, options) {

try {
val = fn(val, signalOpt);

if (val === kEmpty) {
continue;
}

val = PromiseResolve(val);
} catch (err) {
val = PromiseReject(err);
}

if (val === kEmpty) {
continue;
}
cnt += 1;

if (typeof val?.catch === 'function') {
val.catch(onDone);
}
PromisePrototypeThen(val, afterItemProcessed, onCatch);

queue.push(val);
if (next) {
next();
next = null;
}

if (!done && queue.length && queue.length >= concurrency) {
if (!done && (queue.length >= highWaterMark || cnt >= concurrency)) {
await new Promise((resolve) => {
resume = resolve;
});
Expand All @@ -138,7 +168,7 @@ function map(fn, options) {
queue.push(kEof);
} catch (err) {
const val = PromiseReject(err);
PromisePrototypeThen(val, undefined, onDone);
PromisePrototypeThen(val, afterItemProcessed, onCatch);
queue.push(val);
} finally {
done = true;
Expand Down Expand Up @@ -169,10 +199,7 @@ function map(fn, options) {
}

queue.shift();
if (resume) {
resume();
resume = null;
}
maybeResume();
}

await new Promise((resolve) => {
Expand Down
2 changes: 1 addition & 1 deletion test/parallel/test-stream-forEach.js
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ const { once } = require('events');
Readable.from([1, 2, 3, 4]).forEach(async (_, { signal }) => {
calls++;
await once(signal, 'abort');
}, { signal: ac.signal, concurrency: 2 });
}, { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
// pump
assert.rejects(async () => {
await forEachPromise;
Expand Down
173 changes: 172 additions & 1 deletion test/parallel/test-stream-map.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,25 @@ const assert = require('assert');
const { once } = require('events');
const { setTimeout } = require('timers/promises');

function createDependentPromises(n) {
const promiseAndResolveArray = [];

for (let i = 0; i < n; i++) {
let res;
const promise = new Promise((resolve) => {
if (i === 0) {
res = resolve;
return;
}
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
});

promiseAndResolveArray.push([promise, res]);
}

return promiseAndResolveArray;
}

{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
Expand Down Expand Up @@ -143,7 +162,7 @@ const { setTimeout } = require('timers/promises');
const stream = range.map(common.mustCall(async (_, { signal }) => {
await once(signal, 'abort');
throw signal.reason;
}, 2), { signal: ac.signal, concurrency: 2 });
}, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
Expand Down Expand Up @@ -173,12 +192,164 @@ const { setTimeout } = require('timers/promises');
})().then(common.mustCall());
}


{
// highWaterMark with small concurrency
const finishOrder = [];

const promises = createDependentPromises(4);

const raw = Readable.from([2, 0, 1, 3]);
const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 2 });

(async () => {
await stream.toArray();

assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// highWaterMark with a lot of items and large concurrency
const finishOrder = [];

const promises = createDependentPromises(20);

const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19];
const raw = Readable.from(input);
// Should be
// 10, 1, 0, 3, 4, 2 | next: 0
// 10, 1, 3, 4, 2, 5 | next: 1
// 10, 3, 4, 2, 5, 7 | next: 2
// 10, 3, 4, 5, 7, 8 | next: 3
// 10, 4, 5, 7, 8, 9 | next: 4
// 10, 5, 7, 8, 9, 6 | next: 5
// 10, 7, 8, 9, 6, 11 | next: 6
// 10, 7, 8, 9, 11, 12 | next: 7
// 10, 8, 9, 11, 12, 13 | next: 8
// 10, 9, 11, 12, 13, 18 | next: 9
// 10, 11, 12, 13, 18, 15 | next: 10
// 11, 12, 13, 18, 15, 16 | next: 11
// 12, 13, 18, 15, 16, 17 | next: 12
// 13, 18, 15, 16, 17, 14 | next: 13
// 18, 15, 16, 17, 14, 19 | next: 14
// 18, 15, 16, 17, 19 | next: 15
// 18, 16, 17, 19 | next: 16
// 18, 17, 19 | next: 17
// 18, 19 | next: 18
// 19 | next: 19
//

const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 6 });

(async () => {
const outputOrder = await stream.toArray();

assert.deepStrictEqual(outputOrder, input);
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Custom highWaterMark with a lot of items and large concurrency
const finishOrder = [];

const promises = createDependentPromises(20);

const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
const raw = Readable.from(input);
// Should be
// 11, 1, 0, 3, 4 | next: 0, buffer: []
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
// 13, 18, 15, 16, 17 | next: 13, buffer: []
// 18, 15, 16, 17, 14 | next: 14, buffer: []
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
// 19 | next: 19, buffer: [] -- all items flushed
//

const stream = raw.map(async (item) => {
const [promise, resolve] = promises[item];
resolve();

await promise;
finishOrder.push(item);
return item;
}, { concurrency: 5, highWaterMark: 7 });

(async () => {
const outputOrder = await stream.toArray();

assert.deepStrictEqual(outputOrder, input);
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
})().then(common.mustCall(), common.mustNotCall());
}

{
// Where there is a delay between the first and the next item it should not wait for filled queue
// before yielding to the user
const promises = createDependentPromises(3);

const raw = Readable.from([0, 1, 2]);

const stream = raw
.map(async (item) => {
if (item !== 0) {
await promises[item][0];
}

return item;
}, { concurrency: 2 })
.map((item) => {
// eslint-disable-next-line no-unused-vars
for (const [_, resolve] of promises) {
resolve();
}

return item;
});

(async () => {
await stream.toArray();
})().then(common.mustCall(), common.mustNotCall());
}

{
// Error cases
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, {
concurrency: 'Foo'
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, {
concurrency: -1
}), /ERR_OUT_OF_RANGE/);
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
}
Expand Down

0 comments on commit 651e450

Please sign in to comment.