Skip to content

Commit

Permalink
feat(operator): add first operator
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros authored and benlesh committed Sep 29, 2015
1 parent 8fe9e39 commit 274c233
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 0 deletions.
23 changes: 23 additions & 0 deletions perf/micro/immediate-scheduler/operators/first-predicate.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {
var predicate = function(value, index) {
return value === 20;
};

var oldFirstNoArgs = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).first(predicate);
var newFirstNoArgs = RxNew.Observable.range(0, 50).first(predicate);

return suite
.add('old first(predicate) with immediate scheduler', function () {
oldFirstNoArgs.subscribe(_next, _error, _complete);
})
.add('new first(predicate) with immediate scheduler', function () {
newFirstNoArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e) { }
function _complete() { }
};
19 changes: 19 additions & 0 deletions perf/micro/immediate-scheduler/operators/first.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
var RxOld = require("rx");
var RxNew = require("../../../../index");

module.exports = function (suite) {
var oldFirstNoArgs = RxOld.Observable.range(0, 50, RxOld.Scheduler.immediate).first();
var newFirstNoArgs = RxNew.Observable.range(0, 50).first();

return suite
.add('old first() with immediate scheduler', function () {
oldFirstNoArgs.subscribe(_next, _error, _complete);
})
.add('new first() with immediate scheduler', function () {
newFirstNoArgs.subscribe(_next, _error, _complete);
});

function _next(x) { }
function _error(e) { }
function _complete() { }
};
117 changes: 117 additions & 0 deletions spec/operators/first-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/* globals describe, it, expect, expectObservable, hot, cold */
var Rx = require('../../dist/cjs/Rx');

describe('Observable.prototype.first()', function() {
it('should take the first value of an observable with one value', function() {
var e1 = hot('---(a|)');
var expected = '---(a|)';
expectObservable(e1.first()).toBe(expected)
});

it('should take the first value of an observable with many values', function() {
var e1 = hot('--a--^--b----c---d--|');
var expected = '---(b|)';
expectObservable(e1.first()).toBe(expected)
});

it('should error on empty', function() {
var e1 = hot('--a--^----|');
var expected = '-----#';
expectObservable(e1.first()).toBe(expected, null, new Rx.EmptyError());
});

it('should return the default value if source observable was empty', function() {
var e1 = hot('-----^----|');
var expected = '-----(a|)';
expectObservable(e1.first(null, null, 'a')).toBe(expected);
});

it('should propagate error from the source observable', function() {
var e1 = hot('---^---#');
var expected = '----#';
expectObservable(e1.first()).toBe(expected)
});

it('should go on forever on never', function() {
var e2 = hot('--^-------');
var expected = '--------';
expectObservable(e2.first()).toBe(expected);
});

it('should return first value that matches a predicate', function() {
var e1 = hot('--a-^--b--c--a--c--|');
var expected = '------(c|)';
var predicate = function (value) {
return value === 'c';
};
expectObservable(e1.first(predicate)).toBe(expected);
});

it('should return first value that matches a predicate for odd numbers', function() {
var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5});
var expected = '------(c|)';
var predicate = function (value) {
return value % 2 === 1;
};
expectObservable(e1.first(predicate)).toBe(expected, {c: 3});
});

it('should return first value that matches a predicate using thisArg', function() {
var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5});
var expected = '------(c|)';
var predicate = function (value) {
expect(this).toEqual(42);
return value % 2 === 1;
};
expectObservable(e1.first(predicate, 42)).toBe(expected, {c: 3});
});

it('should error when no value matches the predicate', function() {
var e1 = hot('--a-^--b--c--a--c--|');
var expected = '---------------#';
var predicate = function (value) {
return value === 's';
};
expectObservable(e1.first(predicate)).toBe(expected, null, new Rx.EmptyError());
});

it('should return the default value when no value matches the predicate', function() {
var e1 = hot('--a-^--b--c--a--c--|');
var expected = '---------------(d|)';
var predicate = function (value) {
return value === 's';
};
expectObservable(e1.first(predicate, null, 'd')).toBe(expected);
});

it('should propagate error when no value matches the predicate', function() {
var e1 = hot('--a-^--b--c--a--#');
var expected = '------------#';
var predicate = function (value) {
return value === 's';
};
expectObservable(e1.first(predicate)).toBe(expected);
});

it('should return first value that matches the index in the predicate', function() {
var e1 = hot('--a-^--b--c--a--c--|');
var expected = '---------(a|)';
var predicate = function (value, index) {
return index === 2;
};
expectObservable(e1.first(predicate)).toBe(expected);
});

it('should propagate error from predicate', function() {
var e1 = hot('--a-^--b--c--d--e--|', {a: 1, b: 2, c: 3, d: 4, e: 5});
var expected = '---------#';
var predicate = function (value) {
if (value < 4) {
return false;
} else {
throw 'error';
}
};
expectObservable(e1.first(predicate)).toBe(expected, null, 'error');
});
});
74 changes: 74 additions & 0 deletions src/operators/first.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import Observable from '../Observable';
import Operator from '../Operator';
import Subscriber from '../Subscriber';
import Observer from '../Observer';

import tryCatch from '../util/tryCatch';
import {errorObject} from '../util/errorObject';
import bindCallback from '../util/bindCallback';
import EmptyError from '../util/EmptyError';

export default function first<T>(predicate?: (value: T, index: number, source: Observable<T>) => boolean,
thisArg?: any,
defaultValue?: any): Observable<T> {
return this.lift(new FirstOperator(predicate, thisArg, defaultValue, this));
}

class FirstOperator<T, R> implements Operator<T, R> {
constructor(private predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private defaultValue?: any,
private source?: Observable<T>) {
}

call(observer: Subscriber<R>): Subscriber<T> {
return new FirstSubscriber(
observer, this.predicate, this.thisArg, this.defaultValue, this.source
);
}
}

class FirstSubscriber<T> extends Subscriber<T> {
private predicate: Function;
private index: number = 0;
private hasCompleted: boolean = false;

constructor(destination: Observer<T>,
predicate?: (value: T, index: number, source: Observable<T>) => boolean,
private thisArg?: any,
private defaultValue?: any,
private source?: Observable<T>) {
super(destination);
if (typeof predicate === 'function') {
this.predicate = bindCallback(predicate, thisArg, 3);
}
}

_next(value: T) {
const destination = this.destination;
const predicate = this.predicate;
let passed: any = true;
if (predicate) {
passed = tryCatch(predicate)(value, this.index++, this.source);
if (passed === errorObject) {
destination.error(passed.e);
return;
}
}
if (passed) {
destination.next(value);
destination.complete();
this.hasCompleted = true;
}
}

_complete() {
const destination = this.destination;
if (!this.hasCompleted && typeof this.defaultValue !== 'undefined') {
destination.next(this.defaultValue);
destination.complete();
} else if (!this.hasCompleted) {
destination.error(new EmptyError);
}
}
}

0 comments on commit 274c233

Please sign in to comment.