Skip to content

Commit

Permalink
feat(operator): add timeInterval operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kwonoj committed Oct 6, 2015
1 parent 57ad986 commit 6cc0615
Show file tree
Hide file tree
Showing 6 changed files with 179 additions and 13 deletions.
20 changes: 20 additions & 0 deletions perf/micro/current-thread-scheduler/operators/timeinterval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldTimeIntervalWithCurrentThreadScheduler = RxOld.Observable.interval(25, RxOld.Scheduler.currentThread)
.take(5).timeInterval(RxOld.Scheduler.currentThread);
var newTimeIntervalWithCurrentThreadScheduler = RxNew.Observable.interval(25, RxNew.Scheduler.immediate)
.take(5).timeInterval(RxNew.Scheduler.immediate);

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old timeInterval() with current thread scheduler', function () {
oldTimeIntervalWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
})
.add('new timeInterval() with current thread scheduler', function () {
newTimeIntervalWithCurrentThreadScheduler.subscribe(_next, _error, _complete);
});
};
20 changes: 20 additions & 0 deletions perf/micro/immediate-scheduler/operators/timeinterval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
var RxOld = require('rx');
var RxNew = require('../../../../index');

module.exports = function (suite) {
var oldTimeIntervalWithImmediateScheduler = RxOld.Observable.interval(25, RxOld.Scheduler.immediate)
.take(5).timeInterval(RxOld.Scheduler.immediate);
var newTimeIntervalWithImmediateScheduler = RxNew.Observable.interval(25)
.take(5).timeInterval();

function _next(x) { }
function _error(e) { }
function _complete() { }
return suite
.add('old timeInterval() with immediate scheduler', function () {
oldTimeIntervalWithImmediateScheduler.subscribe(_next, _error, _complete);
})
.add('new timeInterval() with immediate scheduler', function () {
newTimeIntervalWithImmediateScheduler.subscribe(_next, _error, _complete);
});
};
79 changes: 79 additions & 0 deletions spec/operators/timeInterval-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* globals describe, it, expect, expectObservable, hot, rxTestScheduler */
var Rx = require('../../dist/cjs/Rx.KitchenSink');
var Observable = Rx.Observable;

describe('Observable.prototype.timeInterval()', function () {
it('should record interval if source emit elements', function () {
var e1 = hot('--a--^b--c----d---e--|');
var expected = '-w--x----y---z--|';

var expectedValue = {
w: new Rx.TimeInterval('b', 10),
x: new Rx.TimeInterval('c', 30),
y: new Rx.TimeInterval('d', 50),
z: new Rx.TimeInterval('e', 40)
};

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected, expectedValue);
});

it('should completes without record interval if source does not emits', function () {
var e1 = hot('---------|');
var expected = '---------|';

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected);
});

it('should complete immediately if source is empty', function () {
var e1 = Observable.empty();
var expected = '|';

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected);
});

it('should record interval then does not completes if source emits but not completes', function () {
var e1 = hot('-a--b--');
var expected = '-y--z--';

var expectedValue = {
y: new Rx.TimeInterval('a', 10),
z: new Rx.TimeInterval('b', 30)
};

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected, expectedValue);
});

it('should not completes if source never completes', function () {
var e1 = Observable.never();
var expected = '-';

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected);
});

it('raise error if source raises error', function () {
var e1 = hot('---#');
var expected = '---#';

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected);
});

it('should record interval then raise error if source raises error after emit', function () {
var e1 = hot('-a--b--#');
var expected = '-y--z--#';

var expectedValue = {
y: new Rx.TimeInterval('a', 10),
z: new Rx.TimeInterval('b', 30)
};

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected, expectedValue);
});

it('should raise error if source immediately throws', function () {
var error = 'error';
var e1 = Observable.throw(error);
var expected = '#';

expectObservable(e1.timeInterval(rxTestScheduler)).toBe(expected, null, error);
});
});
21 changes: 13 additions & 8 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import Observable from './Observable';
import Operator from './Operator';
import { CoreOperators } from './CoreOperators';
import { default as IScheduler } from './Scheduler';

interface KitchenSinkOperators<T> extends CoreOperators<T> {
isEmpty?: () => Observable<boolean>;
elementAt?: (index: number, defaultValue?: any) => Observable<T>;
distinctUntilKeyChanged?: (key: string, compare?: (x: any, y: any) => boolean, thisArg?: any) => Observable<T>;
find?: (predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any) => Observable<T>;
findIndex?: (predicate: (value: T, index: number, source:Observable<T>) => boolean, thisArg?: any) => Observable<number>;
find?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<T>;
findIndex?: (predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any) => Observable<number>;
timeInterval?: <T>(scheduler?: IScheduler) => Observable<T>;
}

// operators
Expand Down Expand Up @@ -45,7 +47,7 @@ import IntervalObservable from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

import mergeStatic from './operators/merge-static';
Observable.merge = mergeStatic
Observable.merge = mergeStatic;

import InfiniteObservable from './observables/InfiniteObservable';
Observable.never = InfiniteObservable.create;
Expand All @@ -62,9 +64,7 @@ import TimerObservable from './observables/TimerObservable';
Observable.timer = TimerObservable.create;

import zipStatic from './operators/zip-static';
Observable.zip = zipStatic


Observable.zip = zipStatic;

// Operators
const observableProto = (<KitchenSinkOperators<any>>Observable.prototype);
Expand Down Expand Up @@ -263,6 +263,9 @@ observableProto.takeUntil = takeUntil;
import throttle from './operators/throttle';
observableProto.throttle = throttle;

import timeInterval from './operators/extended/timeInterval';
observableProto.timeInterval = timeInterval;

import timeout from './operators/timeout';
observableProto.timeout = timeout;

Expand Down Expand Up @@ -312,7 +315,8 @@ import nextTick from './schedulers/nextTick';
import immediate from './schedulers/immediate';
import NextTickScheduler from './schedulers/NextTickScheduler';
import ImmediateScheduler from './schedulers/ImmediateScheduler';
import {TestScheduler} from './testing/TestScheduler';
import { TimeInterval } from './operators/extended/timeInterval';
import { TestScheduler } from './testing/TestScheduler';
import VirtualTimeScheduler from './schedulers/VirtualTimeScheduler';

var Scheduler = {
Expand All @@ -333,5 +337,6 @@ export {
EmptyError,
ArgumentOutOfRangeError,
TestScheduler,
VirtualTimeScheduler
VirtualTimeScheduler,
TimeInterval
};
8 changes: 3 additions & 5 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import IntervalObservable from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

import mergeStatic from './operators/merge-static';
Observable.merge = mergeStatic
Observable.merge = mergeStatic;

import InfiniteObservable from './observables/InfiniteObservable';
Observable.never = InfiniteObservable.create;
Expand All @@ -52,9 +52,7 @@ import TimerObservable from './observables/TimerObservable';
Observable.timer = TimerObservable.create;

import zipStatic from './operators/zip-static';
Observable.zip = zipStatic


Observable.zip = zipStatic;

// Operators
import { CoreOperators } from './CoreOperators';
Expand Down Expand Up @@ -305,5 +303,5 @@ export {
ConnectableObservable,
Notification,
EmptyError,
ArgumentOutOfRangeError,
ArgumentOutOfRangeError
};
44 changes: 44 additions & 0 deletions src/operators/extended/timeInterval.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import Operator from '../../Operator';
import Observer from '../../Observer';
import Observable from '../../Observable';
import Subscriber from '../../Subscriber';
import Scheduler from '../../Scheduler';
import immediate from '../../schedulers/immediate';

export default function timeInterval<T>(scheduler: Scheduler = immediate): Observable<TimeInterval> {
return this.lift(new TimeIntervalOperator(scheduler));
}

export class TimeInterval {
constructor(public value: any, public interval: number) {

}
};

class TimeIntervalOperator<TimeInterval, R> implements Operator<TimeInterval, R> {
constructor(private scheduler: Scheduler) {

}

call(observer: Subscriber<TimeInterval>): Subscriber<TimeInterval> {
return new TimeIntervalSubscriber(observer, this.scheduler);
}
}

class TimeIntervalSubscriber<TimeInterval> extends Subscriber<TimeInterval> {
private lastTime: number = 0;

constructor(destination: Subscriber<TimeInterval>, private scheduler: Scheduler) {
super(destination);

this.lastTime = scheduler.now();
}

_next(value: TimeInterval) {
let now = this.scheduler.now();
let span = now - this.lastTime;
this.lastTime = now;

this.destination.next(new TimeInterval(value, span));
}
}

0 comments on commit 6cc0615

Please sign in to comment.