Skip to content

Commit

Permalink
feat: run subscriber callbacks in the current zone, i.e. zone operato…
Browse files Browse the repository at this point in the history
…r can use most outer zone propagated to a callback it's used in.

feat: Meteor.subscribe and Meteor.autorun allow multiple subscribers for the same subscription and computation.
test: add more tests
  • Loading branch information
barbatus committed Oct 1, 2016
1 parent 17ed794 commit ec08c91
Show file tree
Hide file tree
Showing 17 changed files with 269 additions and 76 deletions.
57 changes: 45 additions & 12 deletions dist/MeteorObservable.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@ var MeteorObservable = (function () {
if (utils_1.isMeteorCallbacks(lastParam)) {
throwInvalidCallback('MeteorObservable.call');
}
var zone = utils_1.forkZone();
return rxjs_1.Observable.create(function (observer) {
Meteor.call.apply(Meteor, [name].concat(args.concat([
function (error, result) {
error ? observer.error(error) :
observer.next(result);
observer.complete();
zone.run(function () {
error ? observer.error(error) :
observer.next(result);
observer.complete();
});
}
])));
});
Expand All @@ -35,25 +38,55 @@ var MeteorObservable = (function () {
if (utils_1.isMeteorCallbacks(lastParam)) {
throwInvalidCallback('MeteorObservable.subscribe');
}
return rxjs_1.Observable.create(function (observer) {
var handler = Meteor.subscribe.apply(Meteor, [name].concat(args.concat([{
var zone = utils_1.forkZone();
var observers = [];
var subscribe = function () {
return Meteor.subscribe.apply(Meteor, [name].concat(args.concat([{
onError: function (error) {
observer.error(error);
zone.run(function () {
observers.forEach(function (observer) { return observer.error(error); });
});
},
onReady: function () {
observer.next();
zone.run(function () {
observers.forEach(function (observer) { return observer.next(); });
});
}
}
])));
return function () { return handler.stop(); };
};
var subHandler = null;
return rxjs_1.Observable.create(function (observer) {
observers.push(observer);
// Execute subscribe lazily.
if (subHandler === null) {
subHandler = subscribe();
}
return function () {
utils_1.removeObserver(observers, observer, function () { return subHandler.stop(); });
};
});
};
MeteorObservable.autorun = function () {
return rxjs_1.Observable.create(function (observer) {
var handler = Tracker.autorun(function (computation) {
observer.next(computation);
var zone = utils_1.forkZone();
var observers = [];
var autorun = function () {
return Tracker.autorun(function (computation) {
zone.run(function () {
observers.forEach(function (observer) { return observer.next(computation); });
});
});
return function () { return handler.stop(); };
};
var handler = null;
return rxjs_1.Observable.create(function (observer) {
observers.push(observer);
// Execute autorun lazily.
if (handler === null) {
handler = autorun();
}
return function () {
utils_1.removeObserver(observers, observer, function () { return handler.stop(); });
};
});
};
return MeteorObservable;
Expand Down
12 changes: 5 additions & 7 deletions dist/ObservableCollection.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"use strict";
var rxjs_1 = require('rxjs');
var ObservableCursor_1 = require('./ObservableCursor');
var utils_1 = require('./utils');
var MongoObservable;
(function (MongoObservable) {
'use strict';
Expand All @@ -10,11 +11,11 @@ var MongoObservable;
MongoObservable.fromExisting = fromExisting;
var Collection = (function () {
function Collection(nameOrExisting, options) {
if (typeof nameOrExisting === 'string') {
this._collection = new Mongo.Collection(nameOrExisting, options);
if (nameOrExisting instanceof Mongo.Collection) {
this._collection = nameOrExisting;
}
else {
this._collection = nameOrExisting;
this._collection = new Mongo.Collection(nameOrExisting, options);
}
}
Object.defineProperty(Collection.prototype, "collection", {
Expand Down Expand Up @@ -95,10 +96,7 @@ var MongoObservable;
return rxjs_1.Observable.create(function (observer) {
observers.push(observer);
return function () {
var index = observers.indexOf(observer);
if (index !== -1) {
observers.splice(index, 1);
}
utils_1.removeObserver(observers, observer);
};
});
};
Expand Down
1 change: 1 addition & 0 deletions dist/ObservableCursor.d.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Observable } from 'rxjs';
export declare class ObservableCursor<T> extends Observable<T[]> {
private _zone;
private _data;
private _cursor;
private _hCursor;
Expand Down
19 changes: 10 additions & 9 deletions dist/ObservableCursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,14 @@ var ObservableCursor = (function (_super) {
_this._hCursor = _this._observeCursor(cursor);
}
return function () {
var index = _this._observers.indexOf(observer);
if (index !== -1) {
_this._observers.splice(index, 1);
}
if (!_this._observers.length) {
_this.stop();
}
utils_1.removeObserver(_this._observers, observer, function () { return _this.stop(); });
};
});
this._data = [];
this._observers = [];
_.extend(this, _.omit(cursor, 'count', 'map'));
this._cursor = cursor;
this._zone = utils_1.forkZone();
}
ObservableCursor.create = function (cursor) {
return new ObservableCursor(cursor);
Expand All @@ -41,10 +36,13 @@ var ObservableCursor = (function (_super) {
configurable: true
});
ObservableCursor.prototype.stop = function () {
var _this = this;
this._zone.run(function () {
_this._runComplete();
});
if (this._hCursor) {
this._hCursor.stop();
}
this._runComplete();
this._hCursor = null;
};
ObservableCursor.prototype.dispose = function () {
Expand Down Expand Up @@ -85,7 +83,10 @@ var ObservableCursor = (function (_super) {
};
;
ObservableCursor.prototype._handleChange = function () {
this._runNext(this._data);
var _this = this;
this._zone.run(function () {
_this._runNext(_this._data);
});
};
;
ObservableCursor.prototype._observeCursor = function (cursor) {
Expand Down
4 changes: 4 additions & 0 deletions dist/utils.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Subscriber } from 'rxjs';
export declare type CallbacksObject = {
onReady?: Function;
onError?: Function;
Expand All @@ -8,4 +9,7 @@ export declare const subscribeEvents: string[];
export declare function isMeteorCallbacks(callbacks: any): boolean;
export declare function isCallbacksObject(callbacks: any): boolean;
export declare const g: any;
export declare function forkZone(): any;
export declare function getZone(): any;
export declare function removeObserver(observers: Subscriber<any>[], observer: Subscriber<any>, onEmpty?: Function): void;
export declare const gZone: any;
34 changes: 34 additions & 0 deletions dist/utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,43 @@ exports.isCallbacksObject = isCallbacksObject;
exports.g = typeof global === 'object' ? global :
typeof window === 'object' ? window :
typeof self === 'object' ? self : this;
var METEOR_RXJS_ZONE = 'meteor-rxjs-zone';
var fakeZone = {
name: METEOR_RXJS_ZONE,
run: function (func) {
return func();
},
fork: function (spec) {
return fakeZone;
}
};
function forkZone() {
if (exports.g.Zone) {
var zone = exports.g.Zone.current;
if (zone.name === METEOR_RXJS_ZONE) {
zone = zone.parent || fakeZone;
}
return zone.fork({ name: METEOR_RXJS_ZONE });
}
return fakeZone;
}
exports.forkZone = forkZone;
function getZone() {
if (exports.g.Zone) {
var zone = exports.g.Zone.current;
if (zone.name === METEOR_RXJS_ZONE) {
return zone.parent;
}
return zone;
}
}
exports.getZone = getZone;
function removeObserver(observers, observer, onEmpty) {
var index = observers.indexOf(observer);
observers.splice(index, 1);
if (observers.length === 0 && onEmpty) {
onEmpty();
}
}
exports.removeObserver = removeObserver;
exports.gZone = exports.g.Zone ? exports.g.Zone.current : fakeZone;
3 changes: 2 additions & 1 deletion dist/zone.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ var __extends = (this && this.__extends) || function (d, b) {
d.prototype = b === null ? Object.create(b) : (__.prototype = b.prototype, new __());
};
var rxjs_1 = require('rxjs');
var utils_1 = require('./utils');
function zone(zone) {
return this.lift(new ZoneOperator(zone || Zone.current));
return this.lift(new ZoneOperator(zone || utils_1.getZone()));
}
exports.zone = zone;
var ZoneOperator = (function () {
Expand Down
3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@
"rxjs": "^5.0.0-beta.12"
},
"devDependencies": {
"@types/lodash": "^4.14.35",
"@types/mocha": "^2.2.32",
"@types/underscore": "^1.7.33",
"conventional-changelog-cli": "^1.2.0",
"es6-shim": "^0.35.0",
"ghooks": "^1.2.1",
"rxjs": "5.0.0-beta.12",
"rxjs": "^5.0.0-beta.12",
"tslint": "^3.6.0",
"typescript": "^2.0.0",
"typings": "^1.3.0",
Expand Down
63 changes: 49 additions & 14 deletions src/MeteorObservable.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict';

import {Observable, Subscriber} from 'rxjs';
import {isMeteorCallbacks} from './utils';
import {isMeteorCallbacks, forkZone, removeObserver} from './utils';

function throwInvalidCallback(method: string) {
throw new Error(
Expand All @@ -18,46 +18,81 @@ export class MeteorObservable {
throwInvalidCallback('MeteorObservable.call');
}

let zone = forkZone();
return Observable.create((observer: Subscriber<Meteor.Error | T>) => {
Meteor.call(name, ...args.concat([
(error: Meteor.Error, result: T) => {
error ? observer.error(error) :
observer.next(result);
observer.complete();
zone.run(() => {
error ? observer.error(error) :
observer.next(result);
observer.complete();
});
}
]));
});
}

public static subscribe<T>(name: string, ...args: any[]): Observable<T> {
const lastParam = args[args.length - 1];
let lastParam = args[args.length - 1];

if (isMeteorCallbacks(lastParam)) {
throwInvalidCallback('MeteorObservable.subscribe');
}

return Observable.create((observer: Subscriber<Meteor.Error | T>) => {
let handler = Meteor.subscribe(name, ...args.concat([{
let zone = forkZone();
let observers = [];
let subscribe = () => {
return Meteor.subscribe(name, ...args.concat([{
onError: (error: Meteor.Error) => {
observer.error(error);
zone.run(() => {
observers.forEach(observer => observer.error(error));
});
},
onReady: () => {
observer.next();
zone.run(() => {
observers.forEach(observer => observer.next());
});
}
}
]));
};

return () => handler.stop();
let subHandler = null;
return Observable.create((observer: Subscriber<Meteor.Error | T>) => {
observers.push(observer);
// Execute subscribe lazily.
if (subHandler === null) {
subHandler = subscribe();
}
return () => {
removeObserver(observers,
observer, () => subHandler.stop());
};
});
}

public static autorun(): Observable<Tracker.Computation> {
return Observable.create((observer: Subscriber<Meteor.Error | Tracker.Computation>) => {
let handler = Tracker.autorun((computation: Tracker.Computation) => {
observer.next(computation);
let zone = forkZone();
let observers = [];
let autorun = () => {
return Tracker.autorun((computation: Tracker.Computation) => {
zone.run(() => {
observers.forEach(observer => observer.next(computation));
});
});
};

return () => handler.stop();
let handler = null;
return Observable.create((observer: Subscriber<Meteor.Error | Tracker.Computation>) => {
observers.push(observer);
// Execute autorun lazily.
if (handler === null) {
handler = autorun();
}
return () => {
removeObserver(observers,
observer, () => handler.stop());
};
});
}
}
16 changes: 8 additions & 8 deletions src/ObservableCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import {Observable, Subscriber} from 'rxjs';

import {ObservableCursor} from './ObservableCursor';

import {removeObserver} from './utils';

import Selector = Mongo.Selector;
import ObjectID = Mongo.ObjectID;
import SortSpecifier = Mongo.SortSpecifier;
Expand Down Expand Up @@ -32,11 +34,12 @@ export module MongoObservable {
export class Collection<T> {
private _collection: Mongo.Collection<T>;

constructor(nameOrExisting: string | Mongo.Collection<T>, options?: ConstructorOptions) {
if (typeof nameOrExisting === 'string') {
this._collection = new Mongo.Collection<T>(nameOrExisting, options);
} else {
constructor(nameOrExisting: string | Mongo.Collection<T>,
options?: ConstructorOptions) {
if (nameOrExisting instanceof Mongo.Collection) {
this._collection = nameOrExisting;
} else {
this._collection = new Mongo.Collection<T>(nameOrExisting, options);
}
}

Expand Down Expand Up @@ -157,10 +160,7 @@ export module MongoObservable {
return Observable.create((observer: Subscriber<T>) => {
observers.push(observer);
return () => {
let index = observers.indexOf(observer);
if (index !== -1) {
observers.splice(index, 1);
}
removeObserver(observers, observer);
};
});
}
Expand Down
Loading

0 comments on commit ec08c91

Please sign in to comment.