From fa1535aed7f8dcfe0400f6359c068253032f5973 Mon Sep 17 00:00:00 2001
From: Matteo Collina <hello@matteocollina.com>
Date: Tue, 27 Nov 2018 09:24:48 +0100
Subject: [PATCH] stream: make async iterator .next() always resolve

See: https://github.com/nodejs/readable-stream/issues/387

PR-URL: https://github.com/nodejs/node/pull/24668
Reviewed-By: Gus Caplan <me@gus.host>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Joyee Cheung <joyeec9h3@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
---
 lib/internal/streams/async_iterator.js        | 15 +++--
 .../test-stream-readable-async-iterators.js   | 65 +++++++++++++++++++
 2 files changed, 75 insertions(+), 5 deletions(-)

diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js
index 7eaef69508690d..cc8e218498f995 100644
--- a/lib/internal/streams/async_iterator.js
+++ b/lib/internal/streams/async_iterator.js
@@ -39,6 +39,11 @@ function onReadable(iter) {
 function wrapForNext(lastPromise, iter) {
   return (resolve, reject) => {
     lastPromise.then(() => {
+      if (iter[kEnded]) {
+        resolve(createIterResult(undefined, true));
+        return;
+      }
+
       iter[kHandlePromise](resolve, reject);
     }, reject);
   };
@@ -61,7 +66,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
     }
 
     if (this[kEnded]) {
-      return Promise.resolve(createIterResult(null, true));
+      return Promise.resolve(createIterResult(undefined, true));
     }
 
     if (this[kStream].destroyed) {
@@ -74,7 +79,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
           if (this[kError]) {
             reject(this[kError]);
           } else {
-            resolve(createIterResult(null, true));
+            resolve(createIterResult(undefined, true));
           }
         });
       });
@@ -115,7 +120,7 @@ const ReadableStreamAsyncIteratorPrototype = Object.setPrototypeOf({
           reject(err);
           return;
         }
-        resolve(createIterResult(null, true));
+        resolve(createIterResult(undefined, true));
       });
     });
   },
@@ -131,7 +136,6 @@ const createReadableStreamAsyncIterator = (stream) => {
       value: stream._readableState.endEmitted,
       writable: true
     },
-    [kLastPromise]: { value: null, writable: true },
     // the function passed to new Promise
     // is cached so we avoid allocating a new
     // closure at every run
@@ -151,6 +155,7 @@ const createReadableStreamAsyncIterator = (stream) => {
       writable: true,
     },
   });
+  iterator[kLastPromise] = null;
 
   finished(stream, (err) => {
     if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
@@ -172,7 +177,7 @@ const createReadableStreamAsyncIterator = (stream) => {
       iterator[kLastPromise] = null;
       iterator[kLastResolve] = null;
       iterator[kLastReject] = null;
-      resolve(createIterResult(null, true));
+      resolve(createIterResult(undefined, true));
     }
     iterator[kEnded] = true;
   });
diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js
index 83540de9defea3..aca1e5bcc9d18d 100644
--- a/test/parallel/test-stream-readable-async-iterators.js
+++ b/test/parallel/test-stream-readable-async-iterators.js
@@ -393,6 +393,71 @@ async function tests() {
       r.destroy(null);
     }
   })();
+
+  await (async () => {
+    console.log('all next promises must be resolved on end');
+    const r = new Readable({
+      objectMode: true,
+      read() {
+      }
+    });
+
+    const b = r[Symbol.asyncIterator]();
+    const c = b.next();
+    const d = b.next();
+    r.push(null);
+    assert.deepStrictEqual(await c, { done: true, value: undefined });
+    assert.deepStrictEqual(await d, { done: true, value: undefined });
+  })();
+
+  await (async () => {
+    console.log('all next promises must be resolved on destroy');
+    const r = new Readable({
+      objectMode: true,
+      read() {
+      }
+    });
+
+    const b = r[Symbol.asyncIterator]();
+    const c = b.next();
+    const d = b.next();
+    r.destroy();
+    assert.deepStrictEqual(await c, { done: true, value: undefined });
+    assert.deepStrictEqual(await d, { done: true, value: undefined });
+  })();
+
+  await (async () => {
+    console.log('all next promises must be resolved on destroy with error');
+    const r = new Readable({
+      objectMode: true,
+      read() {
+      }
+    });
+
+    const b = r[Symbol.asyncIterator]();
+    const c = b.next();
+    const d = b.next();
+    const err = new Error('kaboom');
+    r.destroy(err);
+
+    await Promise.all([(async () => {
+      let e;
+      try {
+        await c;
+      } catch (_e) {
+        e = _e;
+      }
+      assert.strictEqual(e, err);
+    })(), (async () => {
+      let e;
+      try {
+        await d;
+      } catch (_e) {
+        e = _e;
+      }
+      assert.strictEqual(e, err);
+    })()]);
+  })();
 }
 
 // to avoid missing some tests if a promise does not resolve