Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

readline: buffer line events in async iterator #41708

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,7 @@ function on(emitter, event, options) {
error = err;
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
return PromiseResolve(createIterResult(undefined, true));
},

[SymbolAsyncIterator]() {
Expand Down
71 changes: 36 additions & 35 deletions lib/internal/readline/interface.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const {
getStringWidth,
stripVTControlCharacters,
} = require('internal/util/inspect');
const EventEmitter = require('events');
const { EventEmitter } = require('events');
const {
charLengthAt,
charLengthLeft,
Expand Down Expand Up @@ -140,6 +140,41 @@ function InterfaceConstructor(input, output, completer, terminal) {

FunctionPrototypeCall(EventEmitter, this);

{
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
},
});
const lineListener = (input) => {
if (!readable.push(input)) {
// TODO(rexagod): drain to resume flow
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
const errorListener = (err) => {
readable.destroy(err);
};
this.on('error', errorListener);
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the current implementation does not close the readline.Interface when the async iterable is closed. This can be easily fixed if we want by doing readable.once('close', () => this.close()) though I am not sure which behavior is better.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should close it. The pattern for async iterators is that the stream is fully consumed after the loop. Maybe add an option?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should close it too but was hesitant to change the existing behavior. If you agree it should be closed I'll close it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so.

}


let history;
let historySize;
let removeHistoryDuplicates = false;
Expand Down Expand Up @@ -280,7 +315,6 @@ function InterfaceConstructor(input, output, completer, terminal) {
self[kRefreshLine]();
}

this[kLineObjectStream] = undefined;

input.on('error', onerror);

Expand Down Expand Up @@ -1324,39 +1358,6 @@ class Interface extends InterfaceConstructor {
* @returns {InterfaceAsyncIterator}
*/
[SymbolAsyncIterator]() {
if (this[kLineObjectStream] === undefined) {
if (Readable === undefined) {
Readable = require('stream').Readable;
}
const readable = new Readable({
objectMode: true,
read: () => {
this.resume();
},
destroy: (err, cb) => {
this.off('line', lineListener);
this.off('close', closeListener);
this.close();
cb(err);
},
});
const lineListener = (input) => {
if (!readable.push(input)) {
// TODO(rexagod): drain to resume flow
this.pause();
}
};
const closeListener = () => {
readable.push(null);
};
const errorListener = (err) => {
readable.destroy(err);
};
this.on('error', errorListener);
this.on('line', lineListener);
this.on('close', closeListener);
this[kLineObjectStream] = readable;
}

return this[kLineObjectStream][SymbolAsyncIterator]();
}
Expand Down
30 changes: 30 additions & 0 deletions test/parallel/test-readline-buffers.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import '../common/index.mjs';

import { createInterface } from 'readline';
import { Readable } from 'stream';
import { setImmediate } from 'timers/promises';
import { deepStrictEqual } from 'assert';

const stream = Readable.from(async function* () {
for (let i = 0; i < 50; i++) {
yield new Buffer.from(i + '\r\n', 'utf-8');
await Promise.resolve();
}
}(), { objectMode: false });

// Promises don't keep the event loop alive so to avoid the process exiting
// below we create an interval;
const interval = setInterval(() => { }, 1000);

const rl = createInterface({
input: stream,
crlfDelay: Infinity
});

await setImmediate();
const result = [];
for await (const item of rl) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it's redundant await, isn't it?

result.push(item);
}
deepStrictEqual(result, Object.keys(Array(50).fill()));
clearInterval(interval);