From 22d286a36d703523356e76f3e706fae6e58704e0 Mon Sep 17 00:00:00 2001
From: Ben Lesh <blesh@netflix.com>
Date: Fri, 14 Oct 2016 13:56:11 -0700
Subject: [PATCH] fix(IteratorObservable): Observables `from` generators will
 now finalize when subscription ends

In order to model the behavior of `for..of` when consuming a Generator, if a `break` is hit in the `for..of`, `return()` is called on the generator and the generator will jump to a `finally` block if it has one. Observables created from generators will now have this same behavior.
```js
Observable.from((function* () {
  try {
    yield 1;
    yield 2;
    yield 3;
  } finally {
    console.log('finalized');
  }
})())
.take(2)
.subscribe(x => console.log(x));

// should log
// 1
// 2
// finalized
```

fixes #1938
---
 spec/observables/IteratorObservable-spec.ts | 63 +++++++++++++++++++++
 src/observable/IteratorObservable.ts        |  6 ++
 2 files changed, 69 insertions(+)

diff --git a/spec/observables/IteratorObservable-spec.ts b/spec/observables/IteratorObservable-spec.ts
index 3149f213a2..63b1d73761 100644
--- a/spec/observables/IteratorObservable-spec.ts
+++ b/spec/observables/IteratorObservable-spec.ts
@@ -1,5 +1,6 @@
 import {expect} from 'chai';
 import * as Rx from '../../dist/cjs/Rx';
+import {queue} from '../../dist/cjs/scheduler/queue';
 import {IteratorObservable} from '../../dist/cjs/observable/IteratorObservable';
 
 declare const expectObservable;
@@ -46,6 +47,68 @@ describe('IteratorObservable', () => {
       );
   });
 
+  it('should finalize generators if the subscription ends', () => {
+    const iterator = {
+      finalized: false,
+      next() {
+        return { value: 'duck', done: false };
+      },
+      return() {
+        this.finalized = true;
+      }
+    };
+
+    const iterable = {
+      [Rx.Symbol.iterator]() {
+        return iterator;
+      }
+    };
+
+    const results = [];
+
+    IteratorObservable.create(iterable)
+      .take(3)
+      .subscribe(
+        x => results.push(x),
+        null,
+        () => results.push('GOOSE!')
+      );
+
+    expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
+    expect(iterator.finalized).to.be.true;
+  });
+
+  it('should finalize generators if the subscription and it is scheduled', () => {
+    const iterator = {
+      finalized: false,
+      next() {
+        return { value: 'duck', done: false };
+      },
+      return() {
+        this.finalized = true;
+      }
+    };
+
+    const iterable = {
+      [Rx.Symbol.iterator]() {
+        return iterator;
+      }
+    };
+
+    const results = [];
+
+    IteratorObservable.create(iterable, queue)
+      .take(3)
+      .subscribe(
+        x => results.push(x),
+        null,
+        () => results.push('GOOSE!')
+      );
+
+    expect(results).to.deep.equal(['duck', 'duck', 'duck', 'GOOSE!']);
+    expect(iterator.finalized).to.be.true;
+  });
+
   it('should emit members of an array iterator on a particular scheduler', () => {
     const source = IteratorObservable.create(
       [10, 20, 30, 40],
diff --git a/src/observable/IteratorObservable.ts b/src/observable/IteratorObservable.ts
index 4195965f25..586afc6ad3 100644
--- a/src/observable/IteratorObservable.ts
+++ b/src/observable/IteratorObservable.ts
@@ -36,6 +36,9 @@ export class IteratorObservable<T> extends Observable<T> {
     state.index = index + 1;
 
     if (subscriber.closed) {
+      if (typeof iterator.return === 'function') {
+        iterator.return();
+      }
       return;
     }
 
@@ -71,6 +74,9 @@ export class IteratorObservable<T> extends Observable<T> {
           subscriber.next(result.value);
         }
         if (subscriber.closed) {
+          if (typeof iterator.return === 'function') {
+            iterator.return();
+          }
           break;
         }
       } while (true);