Skip to content

Commit

Permalink
refactor(every): perf improvements for scalar and array observables
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Oct 2, 2015
1 parent d11f32e commit d5120d2
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 5 deletions.
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/every-predicate-array.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(x) {
return x === 'hi';
}

var oldEveryPredicateArgs = RxOld.Observable.fromArray([1, 2, 3, 4, 5, 6, 7, 8, 9, 10], RxOld.Scheduler.immediate).every(predicate);
var newEveryPredicateArgs = RxNew.Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10).every(predicate);

return suite
.add('old static array observable every(predicate) with immediate scheduler', function () {
oldEveryPredicateArgs.subscribe(_next, _error, _complete);
})
.add('new static array observable every(predicate) with immediate scheduler', function () {
newEveryPredicateArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
24 changes: 24 additions & 0 deletions perf/micro/immediate-scheduler/operators/every-predicate-scalar.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {

var predicate = function(x) {
return x === 'hi';
}

var oldEveryPredicateArgs = RxOld.Observable.of('hi', RxOld.Scheduler.immediate).every(predicate);
var newEveryPredicateArgs = RxNew.Observable.of('hi').every(predicate);

return suite
.add('old scalar observable with every(predicate) with immediate scheduler', function () {
oldEveryPredicateArgs.subscribe(_next, _error, _complete);
})
.add('new scalar observable with every(predicate) with immediate scheduler', function () {
newEveryPredicateArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e){ }
function _complete(){ }
};
2 changes: 1 addition & 1 deletion src/observables/ArrayObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ export default class ArrayObservable<T> extends Observable<T> {
// value used if Array has one value and _isScalar
value: any;

constructor(private array: T[], private scheduler?: Scheduler) {
constructor(public array: T[], public scheduler?: Scheduler) {
super();
if(!scheduler && array.length === 1) {
this._isScalar = true;
Expand Down
2 changes: 1 addition & 1 deletion src/observables/PromiseObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default class PromiseObservable<T> extends Observable<T> {
return new PromiseObservable(promise, scheduler);
}

constructor(private promise: Promise<T>, private scheduler: Scheduler) {
constructor(private promise: Promise<T>, public scheduler: Scheduler) {
super();
}

Expand Down
2 changes: 1 addition & 1 deletion src/observables/ScalarObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ export default class ScalarObservable<T> extends Observable<T> {

_isScalar: boolean = true;

constructor(public value: T, private scheduler?: Scheduler) {
constructor(public value: T, public scheduler?: Scheduler) {
super();
}

Expand Down
28 changes: 26 additions & 2 deletions src/operators/every.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import Operator from '../Operator';
import Observer from '../Observer';
import Observable from '../Observable';
import ScalarObservable from '../observables/ScalarObservable';
import ArrayObservable from '../observables/ArrayObservable';
import ErrorObservable from '../observables/ErrorObservable';
import Subscriber from '../Subscriber';

import immediate from '../schedulers/immediate';
import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';

export default function every<T>(predicate: (value: T, index: number, source: Observable<T>) => boolean, thisArg?: any): Observable<T>{
return this.lift(new EveryOperator(predicate, thisArg, this));
const source = this;
let result;

if(source._isScalar) {
result = tryCatch(predicate)(source.value, 0, source);
if(result === errorObject) {
return new ErrorObservable(errorObject.e, source.scheduler);
} else {
return new ScalarObservable(result, source.scheduler);
}
}

if(source instanceof ArrayObservable) {
const array = (<ArrayObservable<T>>source).array;
let result = tryCatch((array, predicate) => array.every(<any>predicate))(array, predicate);
if(result === errorObject) {
return new ErrorObservable(errorObject.e, source.scheduler);
} else {
return new ScalarObservable(result, source.scheduler);
}
}
return source.lift(new EveryOperator(predicate, thisArg, source));
}

class EveryOperator<T, R> implements Operator<T, R> {
Expand Down

0 comments on commit d5120d2

Please sign in to comment.