diff --git a/lib/events.js b/lib/events.js index 3ba2484ece938f..996658e1d3f565 100644 --- a/lib/events.js +++ b/lib/events.js @@ -1167,14 +1167,8 @@ function on(emitter, event, options = kEmptyObject) { addEventListener(emitter, closeEvents[i], closeHandler); } } - if (signal) { - kResistStopPropagation ??= require('internal/event_target').kResistStopPropagation; - eventTargetAgnosticAddListener( - signal, - 'abort', - abortListener, - { __proto__: null, once: true, [kResistStopPropagation]: true }); - } + + const abortListenerDisposable = signal ? addAbortListener(signal, abortListener) : null; return iterator; @@ -1201,6 +1195,7 @@ function on(emitter, event, options = kEmptyObject) { } function closeHandler() { + abortListenerDisposable?.[SymbolDispose](); removeAll(); finished = true; const doneResult = createIterResult(undefined, true); diff --git a/test/parallel/test-events-on-async-iterator.js b/test/parallel/test-events-on-async-iterator.js index 057af8537f3275..2a68bf987dde28 100644 --- a/test/parallel/test-events-on-async-iterator.js +++ b/test/parallel/test-events-on-async-iterator.js @@ -6,6 +6,7 @@ const assert = require('assert'); const { on, EventEmitter } = require('events'); const { NodeEventTarget, + kEvents } = require('internal/event_target'); async function basic() { @@ -372,6 +373,36 @@ async function abortableOnAfterDone() { }); } +async function abortListenerRemovedAfterComplete() { + const ee = new EventEmitter(); + const ac = new AbortController(); + + const i = setInterval(() => ee.emit('foo', 'foo'), 1); + try { + // Below: either the kEvents map is empty or the 'abort' listener list is empty + + // Return case + const endedIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + endedIterator.return(); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + + // Throw case + const throwIterator = on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + throwIterator.throw(new Error()); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + + // Abort case + on(ee, 'foo', { signal: ac.signal }); + assert.ok(ac.signal[kEvents].get('abort').size > 0); + ac.abort(new Error()); + assert.strictEqual(ac.signal[kEvents].get('abort')?.size ?? ac.signal[kEvents].size, 0); + } finally { + clearInterval(i); + } +} + async function run() { const funcs = [ basic, @@ -391,6 +422,7 @@ async function run() { eventTargetAbortableOnAfter, eventTargetAbortableOnAfter2, abortableOnAfterDone, + abortListenerRemovedAfterComplete, ]; for (const fn of funcs) {