diff --git a/lib/internal/readline/eventsToAsyncIteratorFactory.js b/lib/internal/readline/eventsToAsyncIteratorFactory.js new file mode 100644 index 00000000000000..ada6b979851437 --- /dev/null +++ b/lib/internal/readline/eventsToAsyncIteratorFactory.js @@ -0,0 +1,108 @@ +'use strict'; +const { + Promise, + SymbolAsyncIterator, + ArrayPrototypeConcat, +} = primordials; +const FixedQueue = require('internal/fixed_queue'); + +const PAUSE_THRESHOLD = 1024; +const RESUME_THRESHOLD = 1; +const ITEM_EVENTS = ['data']; +const CLOSE_EVENTS = ['close', 'end']; +const ERROR_EVENTS = ['error']; + + +function waitNext(emitter, next, events) { + return new Promise((resolve, reject) => { + const resolveNext = () => { + for (let i = 0; i < events.length; i++) + emitter.off(events[i], resolveNext); + try { + resolve(next()); + } catch (promiseError) { + reject(promiseError); + } + }; + for (let i = 0; i < events.length; i++) + emitter.once(events[i], resolveNext); + }); +} + +module.exports = function eventsToAsyncIteratorFactory(readable, { + pauseThreshold = PAUSE_THRESHOLD, + resumeThreshold = RESUME_THRESHOLD, + closeEvents = CLOSE_EVENTS, + itemEvents = ITEM_EVENTS, + errorEvents = ERROR_EVENTS, +}) { + const events = ArrayPrototypeConcat(itemEvents, errorEvents, closeEvents); + const highWaterMark = RESUME_THRESHOLD; + + const queue = new FixedQueue(); + let done = false; + let error; + let queueSize = 0; + let paused = false; + const onError = (value) => { + turn('off'); + error = value; + }; + const onClose = () => { + turn('off'); + done = true; + }; + const onItem = (value) => { + queue.push(value); + queueSize++; + if (queueSize >= pauseThreshold) { + paused = true; + readable.pause(); + } + }; + function turn(onOff) { + for (let i = 0; i < closeEvents.length; i++) + readable[onOff](closeEvents[i], onClose); + for (let i = 0; i < itemEvents.length; i++) + readable[onOff](itemEvents[i], onItem); + for (let i = 0; i < itemEvents.length; i++) + readable[onOff](errorEvents[i], onError); + } + + turn('on'); + + function next() { + if (!queue.isEmpty()) { + const value = queue.shift(); + queueSize--; + if (queueSize < resumeThreshold) { + paused = false; + readable.resume(); + } + return { + done: false, + value, + }; + } + if (error) { + throw error; + } + if (done) { + return { done }; + } + return waitNext(readable, next, events); + } + + const result = { + next, + highWaterMark, + get isPaused() { + return paused; + }, + get queueSize() { + return queueSize; + } + }; + result[SymbolAsyncIterator] = () => result; + return result; +}; diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index e50172f5628ccc..4c6153cf132f13 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -64,9 +64,6 @@ const { const { StringDecoder } = require('string_decoder'); -// Lazy load Readable for startup performance. -let Readable; - const kHistorySize = 30; const kMincrlfDelay = 100; // \r\n, \n, or \r followed by something other than \n @@ -1185,41 +1182,16 @@ class Interface extends InterfaceConstructor { * @returns {InterfaceAsyncIterator} */ [SymbolAsyncIterator]() { - if (this[kLineObjectStream] === undefined) { - if (Readable === undefined) { - Readable = require('stream').Readable; - } - const readable = new Readable({ - objectMode: true, - read: () => { - this.resume(); - }, - destroy: (err, cb) => { - this.off('line', lineListener); - this.off('close', closeListener); - this.close(); - cb(err); - }, - }); - const lineListener = (input) => { - if (!readable.push(input)) { - // TODO(rexagod): drain to resume flow - this.pause(); - } - }; - const closeListener = () => { - readable.push(null); - }; - const errorListener = (err) => { - readable.destroy(err); - }; - this.on('error', errorListener); - this.on('line', lineListener); - this.on('close', closeListener); - this[kLineObjectStream] = readable; + if (!this[kLineObjectStream]) { + this[kLineObjectStream] = require( + 'internal/readline/eventsToAsyncIteratorFactory' + )( + this, { + itemEvents: ['line'], + closeEvents: ['close'] + }); } - - return this[kLineObjectStream][SymbolAsyncIterator](); + return this[kLineObjectStream]; } } diff --git a/test/parallel/test-readline-async-iterators-backpressure.js b/test/parallel/test-readline-async-iterators-backpressure.js index 2ca124dde5b890..fddaeb368593e4 100644 --- a/test/parallel/test-readline-async-iterators-backpressure.js +++ b/test/parallel/test-readline-async-iterators-backpressure.js @@ -6,11 +6,17 @@ const { Readable } = require('stream'); const readline = require('readline'); const CONTENT = 'content'; -const TOTAL_LINES = 18; +const LINES_PER_PUSH = 2051; +const REPETITIONS = 3; (async () => { const readable = new Readable({ read() {} }); - readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES)); + let salt = 0; + for (let i = 0; i < REPETITIONS; i++) { + readable.push(`${CONTENT}\n`.repeat(LINES_PER_PUSH + i)); + salt += i; + } + const TOTAL_LINES = LINES_PER_PUSH * REPETITIONS + salt; const rli = readline.createInterface({ input: readable, @@ -18,7 +24,7 @@ const TOTAL_LINES = 18; }); const it = rli[Symbol.asyncIterator](); - const highWaterMark = it.stream.readableHighWaterMark; + const highWaterMark = it.highWaterMark; // For this test to work, we have to queue up more than the number of // highWaterMark items in rli. Make sure that is the case. @@ -26,13 +32,15 @@ const TOTAL_LINES = 18; let iterations = 0; let readableEnded = false; + let notPaused = 0; for await (const line of it) { assert.strictEqual(readableEnded, false); - assert.strictEqual(line, CONTENT); - - const expectedPaused = TOTAL_LINES - iterations > highWaterMark; - assert.strictEqual(readable.isPaused(), expectedPaused); + assert.ok(it.queueSize <= TOTAL_LINES); + assert.strictEqual(readable.isPaused(), it.queueSize >= 1); + if (!readable.isPaused()) { + notPaused++; + } iterations += 1; @@ -45,4 +53,5 @@ const TOTAL_LINES = 18; } assert.strictEqual(iterations, TOTAL_LINES); + assert.strictEqual(notPaused, REPETITIONS); })().then(common.mustCall()); diff --git a/test/parallel/test-readline-async-iterators.js b/test/parallel/test-readline-async-iterators.js index 2aa557a3363486..71d36ca41d49f9 100644 --- a/test/parallel/test-readline-async-iterators.js +++ b/test/parallel/test-readline-async-iterators.js @@ -4,6 +4,7 @@ const common = require('../common'); const fs = require('fs'); const { join } = require('path'); const readline = require('readline'); +const { Readable } = require('stream'); const assert = require('assert'); const tmpdir = require('../common/tmpdir'); @@ -63,7 +64,6 @@ async function testMutual() { // This outer loop should only iterate once. assert.strictEqual(iterated, false); iterated = true; - iteratedLines.push(k); for await (const l of rli) { iteratedLines.push(l); @@ -74,4 +74,115 @@ async function testMutual() { } } -testSimple().then(testMutual).then(common.mustCall()); +async function testPerformance() { + const loremIpsum = `Lorem ipsum dolor sit amet, consectetur adipiscing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. +Dui accumsan sit amet nulla facilisi morbi tempus iaculis urna. +Eget dolor morbi non arcu risus quis varius quam quisque. +Lacus viverra vitae congue eu consequat ac felis donec. +Amet porttitor eget dolor morbi non arcu. +Velit ut tortor pretium viverra suspendisse. +Mauris nunc congue nisi vitae suscipit tellus. +Amet nisl suscipit adipiscing bibendum est ultricies integer. +Sit amet dictum sit amet justo donec enim diam. +Condimentum mattis pellentesque id nibh tortor id aliquet lectus proin. +Diam in arcu cursus euismod quis viverra nibh. +`; + + const REPETITIONS = 10000; + const SAMPLE = 100; + const THRESHOLD = 81; + + function getLoremIpsumStream() { + const readable = Readable({ + objectMode: true, + }); + let i = 0; + readable._read = () => readable.push( + i++ >= REPETITIONS ? null : loremIpsum + ); + return readable; + } + + function oldWay() { + const readable = new Readable({ + objectMode: true, + read: () => { + this.resume(); + }, + destroy: (err, cb) => { + this.off('line', lineListener); + this.off('close', closeListener); + this.close(); + cb(err); + }, + }); + const lineListener = (input) => { + if (!readable.push(input)) { + // TODO(rexagod): drain to resume flow + this.pause(); + } + }; + const closeListener = () => { + readable.push(null); + }; + const errorListener = (err) => { + readable.destroy(err); + }; + this.on('error', errorListener); + this.on('line', lineListener); + this.on('close', closeListener); + return readable[Symbol.asyncIterator](); + } + + function getAvg(mean, x, n) { + return (mean * n + x) / (n + 1); + } + + let totalTimeOldWay = 0; + let totalTimeNewWay = 0; + let totalCharsOldWay = 0; + let totalCharsNewWay = 0; + const linesOldWay = []; + const linesNewWay = []; + + for (let time = 0; time < SAMPLE; time++) { + const rlOldWay = readline.createInterface({ + input: getLoremIpsumStream(), + }); + let currenttotalTimeOldWay = Date.now(); + for await (const line of oldWay.call(rlOldWay)) { + totalCharsOldWay += line.length; + if (time === 0) { + linesOldWay.push(line); + } + } + currenttotalTimeOldWay = Date.now() - currenttotalTimeOldWay; + totalTimeOldWay = getAvg(totalTimeOldWay, currenttotalTimeOldWay, SAMPLE); + + const rlNewWay = readline.createInterface({ + input: getLoremIpsumStream(), + }); + let currentTotalTimeNewWay = Date.now(); + for await (const line of rlNewWay) { + totalCharsNewWay += line.length; + if (time === 0) { + linesNewWay.push(line); + } + } + currentTotalTimeNewWay = Date.now() - currentTotalTimeNewWay; + totalTimeNewWay = getAvg(totalTimeNewWay, currentTotalTimeNewWay, SAMPLE); + } + + assert.strictEqual(totalCharsOldWay, totalCharsNewWay); + assert.strictEqual(linesOldWay.length, linesNewWay.length); + linesOldWay.forEach((line, index) => + assert.strictEqual(line, linesNewWay[index]) + ); + const percentage = totalTimeNewWay * 100 / totalTimeOldWay; + assert.ok(percentage <= THRESHOLD, `Failed: ${totalTimeNewWay} isn't lesser than ${THRESHOLD}% of ${totalTimeOldWay}. Actual percentage: ${percentage.toFixed(2)}%`); +} + +testSimple() + .then(testMutual) + .then(testPerformance) + .then(common.mustCall());