Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

events: simplify on implementation #41851

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 37 additions & 94 deletions lib/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -976,13 +976,6 @@ async function once(emitter, name, options = {}) {
});
}

const AsyncIteratorPrototype = ObjectGetPrototypeOf(
ObjectGetPrototypeOf(async function* () {}).prototype);

function createIterResult(value, done) {
return { value, done };
}

function eventTargetAgnosticRemoveListener(emitter, name, listener, flags) {
if (typeof emitter.removeListener === 'function') {
emitter.removeListener(name, listener);
Expand Down Expand Up @@ -1017,80 +1010,15 @@ function eventTargetAgnosticAddListener(emitter, name, listener, flags) {
* @returns {AsyncIterator}
*/
function on(emitter, event, options) {
const queue = [];
ronag marked this conversation as resolved.
Show resolved Hide resolved
let resume = null;
let error = null;

const signal = options?.signal;
validateAbortSignal(signal, 'options.signal');
if (signal?.aborted)
if (signal?.aborted) {
throw new AbortError(undefined, { cause: signal?.reason });

const unconsumedEvents = [];
const unconsumedPromises = [];
let error = null;
let finished = false;

const iterator = ObjectSetPrototypeOf({
next() {
// First, we consume all unread events
const value = unconsumedEvents.shift();
if (value) {
return PromiseResolve(createIterResult(value, false));
}

// Then we error, if an error happened
// This happens one time if at all, because after 'error'
// we stop listening
if (error) {
const p = PromiseReject(error);
// Only the first element errors
error = null;
return p;
}

// If the iterator is finished, resolve to done
if (finished) {
return PromiseResolve(createIterResult(undefined, true));
}

// Wait until an event happens
return new Promise(function(resolve, reject) {
unconsumedPromises.push({ resolve, reject });
});
},

return() {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);

if (signal) {
eventTargetAgnosticRemoveListener(
signal,
'abort',
abortListener,
{ once: true });
}

finished = true;

for (const promise of unconsumedPromises) {
promise.resolve(createIterResult(undefined, true));
}

return PromiseResolve(createIterResult(undefined, true));
},

throw(err) {
if (!err || !(err instanceof Error)) {
throw new ERR_INVALID_ARG_TYPE('EventEmitter.AsyncIterator',
'Error', err);
}
error = err;
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
},

[SymbolAsyncIterator]() {
return this;
}
}, AsyncIteratorPrototype);
}

eventTargetAgnosticAddListener(emitter, event, eventHandler);
if (event !== 'error' && typeof emitter.on === 'function') {
Expand All @@ -1105,33 +1033,48 @@ function on(emitter, event, options) {
{ once: true });
}

return iterator;
function errorHandler (err) {
error = err;
if (resume) {
resume(Promise.reject(err));
resume = null;
}
}

function abortListener() {
errorHandler(new AbortError(undefined, { cause: signal?.reason }));
}

function eventHandler(...args) {
const promise = ArrayPrototypeShift(unconsumedPromises);
if (promise) {
promise.resolve(createIterResult(args, false));
if (resume) {
resume(args);
resume = null;
} else {
unconsumedEvents.push(args);
queue.push(args);
}
}

function errorHandler(err) {
finished = true;
return async function * () {
try {
while (true) {
while (queue.length) {
if (error) {
throw error;
}
yield queue.shift();
}

const toError = ArrayPrototypeShift(unconsumedPromises);
if (error) {
ronag marked this conversation as resolved.
Show resolved Hide resolved
throw error;
}

if (toError) {
toError.reject(err);
} else {
// The next time we call next()
error = err;
yield await new Promise(resolve => {
resume = resolve;
});
}
} finally {
eventTargetAgnosticRemoveListener(emitter, event, eventHandler);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the behavior here is to drop all events when the iterator is returned right?

So if I .next() 5 times and then .return I would get { done: true, value: undefined } on all 5?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I believe so

eventTargetAgnosticRemoveListener(emitter, 'error', errorHandler);
ronag marked this conversation as resolved.
Show resolved Hide resolved
}

iterator.return();
}
}()
}
75 changes: 1 addition & 74 deletions test/parallel/test-events-on-async-iterator.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async function basic() {
for await (const event of iterable) {
const current = expected.shift();

assert.deepStrictEqual(current, event);
assert.deepStrictEqual(event, current);

if (expected.length === 0) {
break;
Expand Down Expand Up @@ -113,39 +113,6 @@ async function throwInLoop() {
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function next() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why these tests deleted?

Copy link
Member Author

@ronag ronag Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't work with an async generator implementation. Please do have a look at it. But I can't find a way to make that test pass with an async generator and assuming async generators are "correct" then the test is "incorrect".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They don't work with an async generator implementation

What is the output with the async iterator implementation?

These tests specifically check that if the user asked for two events and then closed the iterator they still get those two events and not one event and then the iterator closing. That behavior is quite explicit - though we can talk about changing it (it'd be a breaking change though I doubt a lot of users are relying on this behavior)

Copy link
Member Author

@ronag ronag Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the problem here is the return() which doesn't actually guarantee that the last iterable.next() (where you expect { done: true }) will ever resolve since the async generator will never reach the yield statement while waiting for the promise to resolve. As far as I know there is not way to do this in an async generator. It's not a problem for for await loops so I'm not quite sure what exactly the spec or users would expect out of this use case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@ronag ronag Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't see? Bad link?

Copy link
Member Author

@ronag ronag Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about the specifics of this but it seems weird to me that we want to achieve a behaviour which is not achievable with an async generator. Feels a bit like outside of spec. Maybe I'm a bit too hung up on assuming async generator is the "correct" way.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know about the specifics of this but it seems weird to me that we want to achieve a behaviour which is not achievable with an async generator.

Not dropping these events is why the implementation didn't use an async generator initially - it's OK to change that if you think the simplification is worth losing that API guarantee.

It's true users who for... await will next() one at a time but implementations (think from) can ask for multiple values at once and this changes the behavior of the last chunks in the stream.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you think the simplification is worth losing that API guarantee.

I think so. But I'm not sure if my opinion is sufficient here. I don't understand why we wanted this API guarantee to begin with.

const ee = new EventEmitter();
const iterable = on(ee, 'foo');

process.nextTick(function() {
ee.emit('foo', 'bar');
ee.emit('foo', 42);
iterable.return();
});

const results = await Promise.all([
iterable.next(),
iterable.next(),
iterable.next(),
]);

assert.deepStrictEqual(results, [{
value: ['bar'],
done: false
}, {
value: [42],
done: false
}, {
value: undefined,
done: true
}]);

assert.deepStrictEqual(await iterable.next(), {
value: undefined,
done: true
});
}

async function nextError() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');
Expand Down Expand Up @@ -177,44 +144,6 @@ async function nextError() {
assert.strictEqual(ee.listeners('error').length, 0);
}

async function iterableThrow() {
const ee = new EventEmitter();
const iterable = on(ee, 'foo');

process.nextTick(() => {
ee.emit('foo', 'bar');
ee.emit('foo', 42); // lost in the queue
iterable.throw(_err);
});

const _err = new Error('kaboom');
let thrown = false;

assert.throws(() => {
// No argument
iterable.throw();
}, {
message: 'The "EventEmitter.AsyncIterator" property must be' +
' an instance of Error. Received undefined',
name: 'TypeError'
});

const expected = [['bar'], [42]];

try {
for await (const event of iterable) {
assert.deepStrictEqual(event, expected.shift());
}
} catch (err) {
thrown = true;
assert.strictEqual(err, _err);
}
assert.strictEqual(thrown, true);
assert.strictEqual(expected.length, 0);
assert.strictEqual(ee.listenerCount('foo'), 0);
assert.strictEqual(ee.listenerCount('error'), 0);
}

async function eventTarget() {
const et = new EventTarget();
const tick = () => et.dispatchEvent(new Event('tick'));
Expand Down Expand Up @@ -370,9 +299,7 @@ async function run() {
error,
errorDelayed,
throwInLoop,
next,
nextError,
iterableThrow,
eventTarget,
errorListenerCount,
nodeEventTarget,
Expand Down