From 16bd691f2bbd076e4d4b5c4eaeed5307ed0ccd18 Mon Sep 17 00:00:00 2001
From: kwonoj <kwon.ohjoong@gmail.com>
Date: Wed, 30 Sep 2015 01:59:29 -0700
Subject: [PATCH] fix(timeout): update behavior of timeout, timeoutWith

---
 spec/operators/timeout-spec.js     |  79 +++++++++-------
 spec/operators/timeoutWith-spec.js | 143 ++++++++++++++++++++++++++---
 src/operators/timeout.ts           |  66 ++++++++++---
 src/operators/timeoutWith.ts       |  80 ++++++++++++----
 4 files changed, 288 insertions(+), 80 deletions(-)

diff --git a/spec/operators/timeout-spec.js b/spec/operators/timeout-spec.js
index 185678e9dd..15ec279f54 100644
--- a/spec/operators/timeout-spec.js
+++ b/spec/operators/timeout-spec.js
@@ -1,44 +1,53 @@
-/* globals describe, it, expect */
+/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
 var Rx = require('../../dist/cjs/Rx');
 var Observable = Rx.Observable;
 
-describe('Observable.prototype.timeout', function () {
-  it('should timeout after a specified delay', function (done) {
-    Observable.never().timeout(100)
-      .subscribe(function (x) {
-        throw 'should not next';
-      }, function (err) {
-        expect(err.message).toBe('timeout');
-        done();
-      }, function () {
-        throw 'should not complete';
-      });
-  }, 2000);
+describe('Observable.prototype.timeout()', function () {
+  var defaultTimeoutError = new Error('timeout');
   
-  it('should timeout after a delay and send the passed error', function (done) {
-    Observable.never().timeout(100, 'hello')
-      .subscribe(function () {
-        throw 'should not next';
-      }, function (err) {
-        expect(err).toBe('hello');
-        done();
-      }, function () {
-        throw 'should not complete';
-      })
+  it('should timeout after a specified timeout period', function () {
+    var e1 = Observable.never();
+    var expected = '-----#';
+    
+    expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, null, defaultTimeoutError);
+  });
+  
+  it('should timeout after specified timeout period and send the passed error', function () {
+    var e1 = Observable.never();
+    var expected = '-----#';
+    var value = 'hello';
+    
+    expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, null, value);
+  });
+  
+  it('should not timeout if source completes within absolute timeout period', function() {
+    var e1 =   hot('--a--b--c--d--e--|');
+    var expected = '--a--b--c--d--e--|';
+    
+    var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10);
+    
+    expectObservable(e1.timeout(timeoutValue, null, rxTestScheduler)).toBe(expected);
   });
   
+  it('should not timeout if source emits within timeout period', function() {
+    var e1 =   hot('--a--b--c--d--e--|');
+    var expected = '--a--b--c--d--e--|';
+    
+    expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected);
+  });
   
-  it('should timeout at a specified Date', function (done) {
-    var date = new Date(Date.now() + 100);
+  it('should timeout after a specified timeout period between emit with default error while source emits', function () {
+    var e1 =   hot('---a---b---c------d---e---|');
+    var expected = '---a---b---c----#';
     
-    Observable.never().timeout(date)
-      .subscribe(function (x) {
-        throw 'should not next';
-      }, function (err) {
-        expect(err.message).toBe('timeout');
-        done();
-      }, function () {
-        throw 'should not complete';
-      });
-  }, 2000);
+    expectObservable(e1.timeout(50, null, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, defaultTimeoutError);
+  });
+  
+  it('should timeout after a specified delay with passed error while source emits', function () {
+    var value = 'hello';
+    var e1 =   hot('---a---b---c------d---e---|');
+    var expected = '---a---b---c----#';
+    
+    expectObservable(e1.timeout(50, value, rxTestScheduler)).toBe(expected, {a: 'a', b: 'b', c: 'c'}, value);
+  });
 });
\ No newline at end of file
diff --git a/spec/operators/timeoutWith-spec.js b/spec/operators/timeoutWith-spec.js
index ed418f2726..da51359442 100644
--- a/spec/operators/timeoutWith-spec.js
+++ b/spec/operators/timeoutWith-spec.js
@@ -1,23 +1,136 @@
-/* globals describe, it, expect */
+/* globals describe, it, expect, expectObservable, hot, cold, rxTestScheduler */
 var Rx = require('../../dist/cjs/Rx');
 var Observable = Rx.Observable;
 
-describe('Observable.prototype.timeoutWith', function () {
-  it('should timeout after a specified delay then subscribe to the passed observable', function (done) {
-    var expected = [1, 2, 3];
-    Observable.never().timeoutWith(100, Observable.of(1,2,3))
-      .subscribe(function (x) {
-        expect(x).toBe(expected.shift());
-      }, null, done);
+describe('Observable.prototype.timeoutWith()', function () {
+  it('should timeout after a specified period then subscribe to the passed observable', function () {
+    var e1 = Observable.never();
+    var e2 =  cold('--x--y--z--|');
+    var expected = '-------x--y--z--|';
+   
+    expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should timeout at a specified date then subscribe to the passed observable', function (done) {
+    var expected = ['x', 'y', 'z'];
+    var e1 = Observable.never();
+    var e2 = Observable.fromArray(expected);
+    
+    var res = [];
+    e1.timeoutWith(new Date(Date.now() + 100), e2)
+      .subscribe(function (x) { 
+          res.push(x);
+        }, function(x) {
+          throw 'should not be called';
+        }, function() {
+          expect(res).toEqual(expected);
+          done();
+      });
   }, 2000);
   
+  it('should timeout after a specified period between emit then subscribe to the passed observable when source emits', function () {
+    var e1 =     hot('---a---b------c---|');
+    var e2 =    cold('-x-y-|');
+    var expected =   '---a---b----x-y-|';
+   
+    expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected);
+  });
   
-  it('should timeout at a specified date then subscribe to the passed observable', function (done) {
-    var expected = [1, 2, 3];
-    var date = new Date(Date.now() + 100);
-    Observable.never().timeoutWith(date, Observable.of(1,2,3))
-      .subscribe(function (x) {
-        expect(x).toBe(expected.shift());
-      }, null, done);
+  it('should timeout after a specified period then subscribe to the passed observable when source is empty', function () {
+    var e1 =   hot('-------------|');
+    var e2 =  cold('----x----|');
+    var expected = '--------------x----|';
+    
+    expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should timeout after a specified period between emit then never completes if other source does not complete', function () {
+    var e1 =   hot('--a--b--------c--d--|');
+    var e2 =  cold('-');
+    var expected = '--a--b----';
+    
+    expectObservable(e1.timeoutWith(40, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should timeout after a specified period then subscribe to the passed observable when source raises error after timeout', function () {
+    var e1 =   hot('-------------#');
+    var e2 =  cold('----x----|');
+    var expected = '--------------x----|';
+    
+    expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
+  });
+
+  it('should timeout after a specified period between emit then never completes if other source emits but not complete', function () {
+    var e1 =   hot('-------------|');
+    var e2 =  cold('----x----');
+    var expected = '--------------x----';
+    
+    expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should not timeout if source completes within timeout period', function () {
+    var e1 =   hot('-----|');
+    var e2 =  cold('----x----');
+    var expected = '-----|';
+    
+    expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should not timeout if source raises error within timeout period', function () {
+    var e1 =   hot('-----#');
+    var e2 =  cold('----x----|');
+    var expected = '-----#';
+    
+    expectObservable(e1.timeoutWith(100, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should not timeout if source emits within timeout period', function() {
+    var e1 =   hot('--a--b--c--d--e--|');
+    var e2 =  cold('----x----|');
+    var expected = '--a--b--c--d--e--|';
+    
+    expectObservable(e1.timeoutWith(50, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should timeout after specified Date then subscribe to the passed observable', function(done) {
+    var e1 = Observable.interval(40).take(5);
+    var e2 = Observable.of(100);
+    
+    var res = [];
+    e1.timeoutWith(new Date(Date.now() + 100), e2)
+      .subscribe(function (x) { 
+          res.push(x);
+        }, function(x) {
+          throw 'should not be called';
+        }, function() {
+          expect(res).toEqual([0, 1, 100]);
+          done();
+      });
   }, 2000);
+  
+  it('should not timeout if source completes within specified Date', function() {
+    var e1 =   hot('--a--b--c--d--e--|');
+    var e2 =  cold('--x--|');
+    var expected = '--a--b--c--d--e--|';
+    
+    var timeoutValue = new Date(Date.now() + (expected.length + 2) * 10);
+    
+    expectObservable(e1.timeoutWith(timeoutValue, e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should not timeout if source raises error within specified Date', function() {
+    var e1 =   hot('---a---#');
+    var e2 =  cold('--x--|');
+    var expected = '---a---#';
+    
+    expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected);
+  });
+  
+  it('should timeout specified Date  after specified Date then never completes if other source does not complete', function() {
+    var e1 =   hot('---a---b---c---d---e---|');
+    var e2 =  cold('-')
+    var expected = '---a---b--';
+    
+    expectObservable(e1.timeoutWith(new Date(Date.now() + 100), e2, rxTestScheduler)).toBe(expected);
+  });
 });
\ No newline at end of file
diff --git a/src/operators/timeout.ts b/src/operators/timeout.ts
index 341433971d..9d792f6149 100644
--- a/src/operators/timeout.ts
+++ b/src/operators/timeout.ts
@@ -7,34 +7,72 @@ import Subscription from '../Subscription';
 import isDate from '../util/isDate';
 
 export default function timeout(due: number|Date, errorToSend: any = null, scheduler: Scheduler = immediate) {
-  let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
-  return this.lift(new TimeoutOperator(waitFor, errorToSend, scheduler));
+  let absoluteTimeout = isDate(due);
+  let waitFor = absoluteTimeout ? (+due - Date.now()) : <number>due;
+  
+  return this.lift(new TimeoutOperator(waitFor, absoluteTimeout, errorToSend, scheduler));
 }
 
 class TimeoutOperator<T, R> implements Operator<T, R> {
-  constructor(private waitFor: number, private errorToSend: any, private scheduler: Scheduler) { 
+  constructor(private waitFor: number, private absoluteTimeout:boolean, private errorToSend: any, private scheduler: Scheduler) { 
   }
   
   call(subscriber: Subscriber<R>) {
-    return new TimeoutSubscriber(subscriber, this.waitFor, this.errorToSend, this.scheduler);
+    return new TimeoutSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.errorToSend, this.scheduler);
   }
 }
 
 class TimeoutSubscriber<T> extends Subscriber<T> {
-  timeoutSubscription: Subscription<any>;
+  private index: number = 0;
+  private _previousIndex: number = 0;
+  get previousIndex():number {
+    return this._previousIndex;
+  }
+  private _hasCompleted: boolean = false;
+  get hasCompleted(): boolean {
+    return this._hasCompleted;
+  }
   
-  constructor(destination: Subscriber<T>, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
+  constructor(destination: Subscriber<T>, private absoluteTimeout:boolean, private waitFor: number, private errorToSend: any, private scheduler: Scheduler) {
     super(destination);
-    let delay = waitFor;
-    scheduler.schedule(dispatchTimeout, delay, { subscriber: this });
+    this.scheduleTimeout();
+  }
+  
+  private static dispatchTimeout(state: any): void{
+    const source = state.subscriber;
+    const currentIndex = state.index;
+    
+    if (!source.completed && source.previousIndex === currentIndex) {
+      source.notifyTimeout();
+    }
+  }
+  
+  private scheduleTimeout():void {
+    let currentIndex = this.index;
+    this.scheduler.schedule(TimeoutSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
+    this.index++;
+    this._previousIndex = currentIndex;
   }
   
-  sendTimeoutError() {
+  _next(value: T) {
+    this.destination.next(value);
+    
+    if (!this.absoluteTimeout) {
+      this.scheduleTimeout();
+    }
+  }
+  
+  _error(err) {
+    this.destination.error(err);
+    this._hasCompleted = true;
+  }
+  
+  _complete() {
+    this.destination.complete();
+    this._hasCompleted = true;
+  }
+  
+  notifyTimeout() {
     this.error(this.errorToSend || new Error('timeout'));
   }
-}
-
-function dispatchTimeout<T>(state: { subscriber: TimeoutSubscriber<T> }) {
-  const subscriber = state.subscriber;
-  subscriber.sendTimeoutError();
 }
\ No newline at end of file
diff --git a/src/operators/timeoutWith.ts b/src/operators/timeoutWith.ts
index 7cb38ca929..c4f5902002 100644
--- a/src/operators/timeoutWith.ts
+++ b/src/operators/timeoutWith.ts
@@ -6,37 +6,85 @@ import immediate from '../schedulers/immediate';
 import Subscription from '../Subscription';
 import Observable from '../Observable';
 import isDate from '../util/isDate';
+import OuterSubscriber from '../OuterSubscriber';
+import subscribeToResult from '../util/subscribeToResult';
 
 export default function timeoutWith(due: number|Date, withObservable: Observable<any>, scheduler: Scheduler = immediate) {
-  let waitFor = isDate(due) ? (+due - Date.now()) : <number>due;
-  return this.lift(new TimeoutWithOperator(waitFor, withObservable, scheduler));
+  let absoluteTimeout = isDate(due);
+  let waitFor = absoluteTimeout ? (+due - Date.now()) : <number>due;
+  
+  return this.lift(new TimeoutWithOperator(waitFor, absoluteTimeout, withObservable, scheduler));
 }
 
 class TimeoutWithOperator<T, R> implements Operator<T, R> {
-  constructor(private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) { 
+  constructor(private waitFor: number, private absoluteTimeout:boolean, private withObservable: Observable<any>, private scheduler: Scheduler) { 
   }
   
   call(subscriber: Subscriber<R>) {
-    return new TimeoutWithSubscriber(subscriber, this.waitFor, this.withObservable, this.scheduler);
+    return new TimeoutWithSubscriber(subscriber, this.absoluteTimeout, this.waitFor, this.withObservable, this.scheduler);
   }
 }
 
-class TimeoutWithSubscriber<T> extends Subscriber<T> {
-  timeoutSubscription: Subscription<any>;
+class TimeoutWithSubscriber<T, R> extends OuterSubscriber<T, R> {
+  private timeoutSubscription: Subscription<T> = undefined;
+  private timedOut: boolean = false;
+  private index: number = 0;
+  private _previousIndex: number = 0;
+  get previousIndex():number {
+    return this._previousIndex;
+  }
+  private _hasCompleted: boolean = false;
+  get hasCompleted(): boolean {
+    return this._hasCompleted;
+  }
   
-  constructor(destination: Subscriber<T>, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
+  constructor(destination: Subscriber<T>, private absoluteTimeout:boolean, private waitFor: number, private withObservable: Observable<any>, private scheduler: Scheduler) {
     super(destination);
-    let delay = waitFor;
-    scheduler.schedule(dispatchTimeout, delay, { subscriber: this });
+    this.scheduleTimeout();
   }
   
-  handleTimeout() {
-    const withObservable = this.withObservable;
-    this.add(withObservable.subscribe(this));
+  private static dispatchTimeout(state: any): void{
+    const source = state.subscriber;
+    const currentIndex = state.index;
+    
+    if (!source.completed && source.previousIndex === currentIndex) {
+      source.handleTimeout();
+    }
+  }
+  
+  private scheduleTimeout():void {
+    let currentIndex = this.index;
+    this.scheduler.schedule(TimeoutWithSubscriber.dispatchTimeout, this.waitFor, { subscriber: this, index: currentIndex });
+    this.index++;
+    this._previousIndex = currentIndex;
   }
-}
 
-function dispatchTimeout<T>(state: { subscriber: TimeoutWithSubscriber<T> }) {
-  const subscriber = state.subscriber;
-  subscriber.handleTimeout();
+  _next(value: T) {
+    if (!this.timedOut) {
+      this.destination.next(value);
+      if (!this.absoluteTimeout) {
+        this.scheduleTimeout();
+      }
+    }
+  }
+  
+  _error(err) {
+    if (!this.timedOut) {
+      this.destination.error(err);
+      this._hasCompleted = true;
+    }
+  }
+  
+  _complete() {
+    if (!this.timedOut) {
+      this.destination.complete();
+      this._hasCompleted = true;
+    }
+  }
+    
+  handleTimeout(): void {
+    const withObservable = this.withObservable;
+    this.timedOut = true;
+    this.add(this.timeoutSubscription = subscribeToResult(this, withObservable));
+  } 
 }
\ No newline at end of file