From 534141353e1f09421ec6cfcd1be5f32d3d7ac2ff Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Wed, 26 Jan 2022 21:32:48 +0200 Subject: [PATCH 1/3] readline: buffer line events in async iterator Beforehand the async iterator for readline was created lazily when the stream was accessed which meant no events were buffered. In addition stream handling was modernized and improved in the process to support backpressure correctly. A test was added to assert this. Fixes: https://github.com/nodejs/node/issues/28565 Fixes: https://github.com/nodejs/node/issues/33463 --- lib/events.js | 1 + lib/internal/readline/interface.js | 49 +++++++------------------ test/parallel/test-readline-buffers.mjs | 30 +++++++++++++++ 3 files changed, 45 insertions(+), 35 deletions(-) create mode 100644 test/parallel/test-readline-buffers.mjs diff --git a/lib/events.js b/lib/events.js index f722b17aecae0d..57dc6a47614b07 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1085,6 +1085,7 @@ function on(emitter, event, options) { error = err; eventTargetAgnosticRemoveListener(emitter, event, eventHandler); eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler); + return PromiseResolve(createIterResult(undefined, true)); }, [SymbolAsyncIterator]() { diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index 5c9cb94ced5817..3e30710be28401 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -50,7 +50,7 @@ const { getStringWidth, stripVTControlCharacters, } = require('internal/util/inspect'); -const EventEmitter = require('events'); +const { EventEmitter, on } = require('events'); const { charLengthAt, charLengthLeft, @@ -140,6 +140,17 @@ function InterfaceConstructor(input, output, completer, terminal) { FunctionPrototypeCall(EventEmitter, this); + { + if (Readable === undefined) { + Readable = require('stream').Readable; + } + const onIterator = on(this, 'line'); + this.once('close', () => onIterator.return()); + const readable = Readable.from(onIterator).map((line) => line[0]); + this[kLineObjectStream] = readable; + } + + let history; let historySize; let removeHistoryDuplicates = false; @@ -180,6 +191,8 @@ function InterfaceConstructor(input, output, completer, terminal) { crlfDelay = input.crlfDelay; input = input.input; + + // Create underlying stream early so that it buffers data } if (completer !== undefined && typeof completer !== 'function') { @@ -280,7 +293,6 @@ function InterfaceConstructor(input, output, completer, terminal) { self[kRefreshLine](); } - this[kLineObjectStream] = undefined; input.on('error', onerror); @@ -1324,39 +1336,6 @@ class Interface extends InterfaceConstructor { * @returns {InterfaceAsyncIterator} */ [SymbolAsyncIterator]() { - if (this[kLineObjectStream] === undefined) { - if (Readable === undefined) { - Readable = require('stream').Readable; - } - const readable = new Readable({ - objectMode: true, - read: () => { - this.resume(); - }, - destroy: (err, cb) => { - this.off('line', lineListener); - this.off('close', closeListener); - this.close(); - cb(err); - }, - }); - const lineListener = (input) => { - if (!readable.push(input)) { - // TODO(rexagod): drain to resume flow - this.pause(); - } - }; - const closeListener = () => { - readable.push(null); - }; - const errorListener = (err) => { - readable.destroy(err); - }; - this.on('error', errorListener); - this.on('line', lineListener); - this.on('close', closeListener); - this[kLineObjectStream] = readable; - } return this[kLineObjectStream][SymbolAsyncIterator](); } diff --git a/test/parallel/test-readline-buffers.mjs b/test/parallel/test-readline-buffers.mjs new file mode 100644 index 00000000000000..eb0ac0e557945d --- /dev/null +++ b/test/parallel/test-readline-buffers.mjs @@ -0,0 +1,30 @@ +import '../common/index.mjs'; + +import { createInterface } from 'readline'; +import { Readable } from 'stream'; +import { setImmediate } from 'timers/promises'; +import { deepStrictEqual } from 'assert'; + +const stream = Readable.from(async function* () { + for (let i = 0; i < 50; i++) { + yield new Buffer.from(i + '\r\n', 'utf-8'); + await Promise.resolve(); + } +}(), { objectMode: false }); + +// Promises don't keep the event loop alive so to avoid the process exiting +// below we create an interval; +const interval = setInterval(() => { }, 1000); + +const rl = createInterface({ + input: stream, + crlfDelay: Infinity +}); + +await setImmediate(); +const result = []; +for await (const item of rl) { + result.push(item); +} +deepStrictEqual(result, Object.keys(Array(50).fill())); +clearInterval(interval); From 073dbfdeb9ed00333840c6d7b102d367d06df6a1 Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Wed, 26 Jan 2022 21:36:54 +0200 Subject: [PATCH 2/3] fixup! --- lib/internal/readline/interface.js | 2 -- 1 file changed, 2 deletions(-) diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index 3e30710be28401..f19a713e86781c 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -191,8 +191,6 @@ function InterfaceConstructor(input, output, completer, terminal) { crlfDelay = input.crlfDelay; input = input.input; - - // Create underlying stream early so that it buffers data } if (completer !== undefined && typeof completer !== 'function') { From 5b45c4a4b916fcaa3b7010030091d539f950b6bb Mon Sep 17 00:00:00 2001 From: Benjamin Gruenbaum Date: Fri, 28 Jan 2022 21:35:25 +0200 Subject: [PATCH 3/3] fixup! revert old behavior, keep bugfix --- lib/internal/readline/interface.js | 32 ++++++++++++++++++++++++++---- 1 file changed, 28 insertions(+), 4 deletions(-) diff --git a/lib/internal/readline/interface.js b/lib/internal/readline/interface.js index f19a713e86781c..2f15d0f21d12d6 100644 --- a/lib/internal/readline/interface.js +++ b/lib/internal/readline/interface.js @@ -50,7 +50,7 @@ const { getStringWidth, stripVTControlCharacters, } = require('internal/util/inspect'); -const { EventEmitter, on } = require('events'); +const { EventEmitter } = require('events'); const { charLengthAt, charLengthLeft, @@ -144,9 +144,33 @@ function InterfaceConstructor(input, output, completer, terminal) { if (Readable === undefined) { Readable = require('stream').Readable; } - const onIterator = on(this, 'line'); - this.once('close', () => onIterator.return()); - const readable = Readable.from(onIterator).map((line) => line[0]); + const readable = new Readable({ + objectMode: true, + read: () => { + this.resume(); + }, + destroy: (err, cb) => { + this.off('line', lineListener); + this.off('close', closeListener); + this.close(); + cb(err); + }, + }); + const lineListener = (input) => { + if (!readable.push(input)) { + // TODO(rexagod): drain to resume flow + this.pause(); + } + }; + const closeListener = () => { + readable.push(null); + }; + const errorListener = (err) => { + readable.destroy(err); + }; + this.on('error', errorListener); + this.on('line', lineListener); + this.on('close', closeListener); this[kLineObjectStream] = readable; }