From 9f5d5107ddb8518d91f9a48b8676693c5a7a0cdf Mon Sep 17 00:00:00 2001 From: Ben Lesh Date: Thu, 20 Aug 2015 16:58:07 -0700 Subject: [PATCH] feat(operator): add window operators: window, windowWhen, windowTime, windowCount, windowToggle closes #195 --- spec/operators/window-spec.js | 20 +++++ spec/operators/windowCount-spec.js | 21 +++++ spec/operators/windowTime-spec.js | 36 +++++++++ spec/operators/windowToggle-spec.js | 17 ++++ spec/operators/windowWhen-spec.js | 20 +++++ src/Observable.ts | 7 +- src/Rx.ts | 10 +++ src/Scheduler.ts | 12 ++- src/operators/window.ts | 65 +++++++++++++++ src/operators/windowCount.ts | 73 +++++++++++++++++ src/operators/windowTime.ts | 106 ++++++++++++++++++++++++ src/operators/windowToggle.ts | 120 ++++++++++++++++++++++++++++ src/operators/windowWhen.ts | 90 +++++++++++++++++++++ src/subjects/ReplaySubject.ts | 14 ++-- 14 files changed, 602 insertions(+), 9 deletions(-) create mode 100644 spec/operators/window-spec.js create mode 100644 spec/operators/windowCount-spec.js create mode 100644 spec/operators/windowTime-spec.js create mode 100644 spec/operators/windowToggle-spec.js create mode 100644 spec/operators/windowWhen-spec.js create mode 100644 src/operators/window.ts create mode 100644 src/operators/windowCount.ts create mode 100644 src/operators/windowTime.ts create mode 100644 src/operators/windowToggle.ts create mode 100644 src/operators/windowWhen.ts diff --git a/spec/operators/window-spec.js b/spec/operators/window-spec.js new file mode 100644 index 0000000000..c1339025e9 --- /dev/null +++ b/spec/operators/window-spec.js @@ -0,0 +1,20 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.window', function () { + it('should emit windows that close and reopen', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .window(Observable.interval(320)) + .take(3) + .flatMap(function (x) { return x.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/windowCount-spec.js b/spec/operators/windowCount-spec.js new file mode 100644 index 0000000000..fb6c1b5f54 --- /dev/null +++ b/spec/operators/windowCount-spec.js @@ -0,0 +1,21 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.windowCount', function () { + it('should emit windows at intervals', function (done) { + var expected = [ + [0, 1], + [1, 2], + [2, 3], + [3] + ]; + Observable.range(0, 4) + .windowCount(2, 1) + .take(3) + .flatMap(function (x) { return x.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/windowTime-spec.js b/spec/operators/windowTime-spec.js new file mode 100644 index 0000000000..c035f4bbaa --- /dev/null +++ b/spec/operators/windowTime-spec.js @@ -0,0 +1,36 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.windowTime', function () { + it('should emit windows at intervals', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .windowTime(320) + .take(3) + .flatMap(function (x) { return x.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); + + + it('should emit windows that have been created at intervals and close after the specified delay', function (done) { + var expected = [ + [0, 1, 2, 3, 4], + [2, 3, 4, 5, 6], + [4, 5, 6, 7, 8] + ]; + Observable.interval(100) + .windowTime(520, 220) + .take(3) + .flatMap(function (x) { return x.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/windowToggle-spec.js b/spec/operators/windowToggle-spec.js new file mode 100644 index 0000000000..dc251d7fe7 --- /dev/null +++ b/spec/operators/windowToggle-spec.js @@ -0,0 +1,17 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.windowToggle', function () { + it('should emit windows that are opened by an observable from the first argument and closed by an observable returned by the function in the second argument', function (done) { + Observable.interval(100).take(10) + .windowToggle(Observable.timer(320).mapTo('test'), function (n) { + expect(n).toBe('test'); + return Observable.timer(320); + }) + .flatMap(function (w) { return w.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual([3, 4, 5]) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/spec/operators/windowWhen-spec.js b/spec/operators/windowWhen-spec.js new file mode 100644 index 0000000000..dcf17553e7 --- /dev/null +++ b/spec/operators/windowWhen-spec.js @@ -0,0 +1,20 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.windowWhen', function () { + it('should emit windows that close and reopen', function (done) { + var expected = [ + [0, 1, 2], + [3, 4, 5], + [6, 7, 8] + ]; + Observable.interval(100) + .windowWhen(function () { return Observable.timer(320); }) + .take(3) + .flatMap(function (x) { return x.toArray(); }) + .subscribe(function (w) { + expect(w).toEqual(expected.shift()) + }, null, done); + }, 2000); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index c00a0e3e18..0a17005369 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -162,6 +162,11 @@ export default class Observable { repeat: (count: number) => Observable; groupBy: (keySelector: (value:T) => string, durationSelector?: (group:GroupSubject) => Observable, elementSelector?: (value:T) => R) => Observable; - + window: (closingNotifier: Observable) => Observable>; + windowWhen: (closingSelector: () => Observable) => Observable>; + windowToggle: (openings: Observable, closingSelector?: (openValue: O) => Observable) => Observable> + windowTime: (windowTimeSpan: number, windowCreationInterval?: number, scheduler?: Scheduler) => Observable>; + windowCount: (windowSize: number, startWindowEvery: number) => Observable>; + finally: (ensure: () => void, thisArg?: any) => Observable; } \ No newline at end of file diff --git a/src/Rx.ts b/src/Rx.ts index 19b95d44b4..d345c717e4 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -160,8 +160,18 @@ import _finally from './operators/finally'; observableProto.finally = _finally; import groupBy from './operators/groupBy'; +import window from './operators/window'; +import windowWhen from './operators/windowWhen'; +import windowToggle from './operators/windowToggle'; +import windowTime from './operators/windowTime'; +import windowCount from './operators/windowCount'; observableProto.groupBy = groupBy; +observableProto.window = window; +observableProto.windowWhen = windowWhen; +observableProto.windowToggle = windowToggle; +observableProto.windowTime = windowTime; +observableProto.windowCount = windowCount; import delay from './operators/delay'; import throttle from './operators/throttle'; diff --git a/src/Scheduler.ts b/src/Scheduler.ts index 63ce530228..87bd396711 100644 --- a/src/Scheduler.ts +++ b/src/Scheduler.ts @@ -66,6 +66,10 @@ export class Action extends Subscription { } schedule(state?:any): Action { + if (this.isUnsubscribed) { + return this; + } + this.state = state; const scheduler = this.scheduler; scheduler.actions.push(this); @@ -103,6 +107,9 @@ export class NextTickAction extends Action { id: number; schedule(state?:any): Action { + if (this.isUnsubscribed) { + return this; + } this.state = state; @@ -151,7 +158,10 @@ export class FutureAction extends Action { } schedule(state?:any): Action { - + if (this.isUnsubscribed) { + return this; + } + this.state = state; const id = this.id; diff --git a/src/operators/window.ts b/src/operators/window.ts new file mode 100644 index 0000000000..62c93c9d5b --- /dev/null +++ b/src/operators/window.ts @@ -0,0 +1,65 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function window(closingNotifier: Observable) : Observable> { + return this.lift(new WindowOperator(closingNotifier)); +} + +export class WindowOperator implements Operator { + + constructor(private closingNotifier: Observable) { + } + + call(observer: Observer): Observer { + return new WindowSubscriber(observer, this.closingNotifier); + } +} + +export class WindowSubscriber extends Subscriber { + private window: Subject = new Subject(); + + constructor(destination: Observer, private closingNotifier: Observable) { + super(destination); + this.add(closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this))); + this.openWindow(); + } + + _next(value: T) { + this.window.next(value); + } + + _error(err: any) { + this.window.error(err); + this.destination.error(err); + } + + _complete() { + this.window.complete(); + this.destination.complete(); + } + + openWindow() { + const prevWindow = this.window; + if (prevWindow) { + prevWindow.complete(); + } + this.destination.next(this.window = new Subject()); + } +} + +export class WindowClosingNotifierSubscriber extends Subscriber { + constructor(private parent: WindowSubscriber) { + super(null); + } + + _next() { + this.parent.openWindow(); + } +} \ No newline at end of file diff --git a/src/operators/windowCount.ts b/src/operators/windowCount.ts new file mode 100644 index 0000000000..f11f4560ad --- /dev/null +++ b/src/operators/windowCount.ts @@ -0,0 +1,73 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; +import Scheduler from '../Scheduler'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function windowCount(windowSize: number, startWindowEvery: number = 0) : Observable> { + return this.lift(new WindowCountOperator(windowSize, startWindowEvery)); +} + +export class WindowCountOperator implements Operator { + + constructor(private windowSize: number, private startWindowEvery: number) { + } + + call(observer: Observer): Observer { + return new WindowCountSubscriber(observer, this.windowSize, this.startWindowEvery); + } +} + +export class WindowCountSubscriber extends Subscriber { + private windows: { count: number, window: Subject } [] = []; + private count: number = 0; + + constructor(destination: Observer, private windowSize: number, private startWindowEvery: number) { + super(destination); + } + + _next(value: T) { + const count = (this.count += 1); + const startWindowEvery = this.startWindowEvery; + const windowSize = this.windowSize; + const windows = this.windows; + + if (startWindowEvery && count % this.startWindowEvery === 0) { + let window = new Subject(); + windows.push({ count: 0, window }); + this.destination.next(window); + } + + const len = windows.length; + for (let i = 0; i < len; i++) { + let w = windows[i]; + const window = w.window; + window.next(value); + if (windowSize === (w.count += 1)) { + window.complete(); + } + } + } + + _error(err: any) { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().window.error(err); + } + this.destination.error(err); + } + + _complete() { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().window.complete(); + } + this.destination.complete(); + } +} \ No newline at end of file diff --git a/src/operators/windowTime.ts b/src/operators/windowTime.ts new file mode 100644 index 0000000000..6fde895b65 --- /dev/null +++ b/src/operators/windowTime.ts @@ -0,0 +1,106 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; +import { default as Scheduler, Action } from '../Scheduler'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function windowTime(windowTimeSpan: number, windowCreationInterval: number = null, scheduler: Scheduler = Scheduler.nextTick) : Observable> { + return this.lift(new WindowTimeOperator(windowTimeSpan, windowCreationInterval, scheduler)); +} + +export class WindowTimeOperator implements Operator { + + constructor(private windowTimeSpan: number, private windowCreationInterval: number, private scheduler: Scheduler) { + } + + call(observer: Observer): Observer { + return new WindowTimeSubscriber(observer, this.windowTimeSpan, this.windowCreationInterval, this.scheduler); + } +} + +export class WindowTimeSubscriber extends Subscriber { + private windows: Subject[] = []; + + constructor(destination: Observer, private windowTimeSpan: number, private windowCreationInterval: number, private scheduler: Scheduler) { + super(destination); + if (windowCreationInterval !== null && windowCreationInterval >= 0) { + let window = this.openWindow(); + this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window, context: null }, dispatchWindowClose)) + this.add(scheduler.schedule(windowCreationInterval, { windowTimeSpan, windowCreationInterval, subscriber: this, scheduler }, dispatchWindowCreation)) + } else { + let window = this.openWindow(); + this.add(scheduler.schedule(windowTimeSpan, { subscriber: this, window }, dispatchWindowTimeSpanOnly)); + } + } + + _next(value: T) { + const windows = this.windows; + const len = windows.length; + for (let i = 0; i < len; i++) { + windows[i].next(value); + } + } + + _error(err) { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().error(err); + } + this.destination.error(err); + } + + _complete() { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().complete(); + } + this.destination.complete(); + } + + openWindow(): Subject { + let window = new Subject(); + this.windows.push(window); + this.destination.next(window); + return window; + } + + closeWindow(window: Subject) { + window.complete(); + const windows = this.windows; + windows.splice(windows.indexOf(window), 1); + } +} + +function dispatchWindowTimeSpanOnly(state) { + const subscriber: WindowTimeSubscriber = state.subscriber; + + const prevWindow: Subject = state.window; + if (prevWindow) { + prevWindow.complete(); + } + + let window = subscriber.openWindow(); + (this).schedule({ subscriber, window }); +} + +function dispatchWindowCreation(state) { + let { windowTimeSpan, subscriber, scheduler } = state; + let window = subscriber.openWindow(); + let action = >this; + let context = { action, subscription: null }; + action.add(context.subscription = scheduler.schedule(windowTimeSpan, { subscriber, window, context }, dispatchWindowClose)); + action.schedule(state); +} + +function dispatchWindowClose({ subscriber, window, context }) { + if (context && context.action && context.subscription) { + context.action.remove(context.subscription); + } + subscriber.closeWindow(window); +} \ No newline at end of file diff --git a/src/operators/windowToggle.ts b/src/operators/windowToggle.ts new file mode 100644 index 0000000000..607a4eecf0 --- /dev/null +++ b/src/operators/windowToggle.ts @@ -0,0 +1,120 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function windowToggle(openings: Observable, closingSelector: (openValue: O) => Observable) : Observable> { + return this.lift(new WindowToggleOperator(openings, closingSelector)); +} + +export class WindowToggleOperator implements Operator { + + constructor(private openings: Observable, private closingSelector: (openValue: O) => Observable) { + } + + call(observer: Observer): Observer { + return new WindowToggleSubscriber(observer, this.openings, this.closingSelector); + } +} + +export class WindowToggleSubscriber extends Subscriber { + private windows: Subject[] = []; + private closingNotification: Subscription; + + constructor(destination: Observer, private openings: Observable, private closingSelector: (openValue: O) => Observable) { + super(destination); + this.add(this.openings.subscribe(new WindowToggleOpeningsSubscriber(this))); + } + + _next(value: T) { + const windows = this.windows; + const len = windows.length; + for (let i = 0; i < len; i++) { + windows[i].next(value); + } + } + + _error(err: any) { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().error(err); + } + this.destination.error(err); + } + + _complete() { + const windows = this.windows; + while (windows.length > 0) { + windows.shift().complete(); + } + this.destination.complete(); + } + + openWindow(value: O) { + const window = new Subject(); + this.windows.push(window); + this.destination.next(window); + let windowContext = { + window, + subscription: null + }; + + const closingSelector = this.closingSelector; + let closingNotifier = tryCatch(closingSelector)(value); + if (closingNotifier === errorObject) { + this.error(closingNotifier.e); + } else { + this.add(windowContext.subscription = closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this, windowContext))); + } + } + + closeWindow(windowContext: { subscription: Subscription, window: Subject }) { + const { window, subscription } = windowContext; + const windows = this.windows; + windows.splice(windows.indexOf(window), 1); + window.complete(); + this.remove(subscription); + } +} + +export class WindowClosingNotifierSubscriber extends Subscriber { + constructor(private parent: WindowToggleSubscriber, private windowContext: { window: Subject, subscription: Subscription }) { + super(null); + } + + _next() { + this.parent.closeWindow(this.windowContext); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} + +export class WindowToggleOpeningsSubscriber extends Subscriber { + constructor(private parent: WindowToggleSubscriber) { + super(); + } + + _next(value: T) { + this.parent.openWindow(value); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} \ No newline at end of file diff --git a/src/operators/windowWhen.ts b/src/operators/windowWhen.ts new file mode 100644 index 0000000000..ad094d142f --- /dev/null +++ b/src/operators/windowWhen.ts @@ -0,0 +1,90 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Subscriber from '../Subscriber'; +import Observable from '../Observable'; +import Subject from '../Subject'; +import Subscription from '../Subscription'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; +import bindCallback from '../util/bindCallback'; + +export default function window(closingSelector: () => Observable) : Observable> { + return this.lift(new WindowOperator(closingSelector)); +} + +export class WindowOperator implements Operator { + + constructor(private closingSelector: () => Observable) { + } + + call(observer: Observer): Observer { + return new WindowSubscriber(observer, this.closingSelector); + } +} + +export class WindowSubscriber extends Subscriber { + private window: Subject = new Subject(); + private closingNotification: Subscription; + + constructor(destination: Observer, private closingSelector: () => Observable) { + super(destination); + this.openWindow(); + } + + _next(value: T) { + this.window.next(value); + } + + _error(err: any) { + this.window.error(err); + this.destination.error(err); + } + + _complete() { + this.window.complete(); + this.destination.complete(); + } + + openWindow() { + const prevClosingNotification = this.closingNotification; + if (prevClosingNotification) { + this.remove(prevClosingNotification); + prevClosingNotification.unsubscribe(); + } + + const prevWindow = this.window; + if (prevWindow) { + prevWindow.complete(); + } + + this.destination.next(this.window = new Subject()); + + let closingNotifier = tryCatch(this.closingSelector)(); + if (closingNotifier === errorObject) { + const err = closingNotifier.e; + this.destination.error(err); + this.window.error(err); + } else { + this.add(this.closingNotification = closingNotifier.subscribe(new WindowClosingNotifierSubscriber(this))); + } + } +} + +export class WindowClosingNotifierSubscriber extends Subscriber { + constructor(private parent: WindowSubscriber) { + super(null); + } + + _next() { + this.parent.openWindow(); + } + + _error(err) { + this.parent.error(err); + } + + _complete() { + // noop + } +} \ No newline at end of file diff --git a/src/subjects/ReplaySubject.ts b/src/subjects/ReplaySubject.ts index 5a075428c5..803e95a512 100644 --- a/src/subjects/ReplaySubject.ts +++ b/src/subjects/ReplaySubject.ts @@ -6,17 +6,17 @@ import Subscription from '../Subscription'; export default class ReplaySubject extends Subject { - bufferSize: number; - windowTime: number; - scheduler: Scheduler; + private bufferSize: number; + private _windowTime: number; + private scheduler: Scheduler; private events: ReplayEvent [] = []; constructor(bufferSize: number = Number.POSITIVE_INFINITY, - windowTime: number = Number.POSITIVE_INFINITY, + _windowTime: number = Number.POSITIVE_INFINITY, scheduler?: Scheduler) { super(); this.bufferSize = bufferSize < 1 ? 1 : bufferSize; - this.windowTime = windowTime < 1 ? 1 : windowTime; + this._windowTime = _windowTime < 1 ? 1 : _windowTime; this.scheduler = scheduler; } @@ -44,7 +44,7 @@ export default class ReplaySubject extends Subject { private _getEvents(now) { const bufferSize = this.bufferSize; - const windowTime = this.windowTime; + const _windowTime = this._windowTime; const events = this.events; let eventsCount = events.length; @@ -54,7 +54,7 @@ export default class ReplaySubject extends Subject { // Start at the front of the list. Break early once // we encounter an event that falls within the window. while(spliceCount < eventsCount) { - if((now - events[spliceCount].time) < windowTime) { + if((now - events[spliceCount].time) < _windowTime) { break; } spliceCount += 1;