Skip to content

Commit

Permalink
perf(shareReplay): use correct default paramter (#4130)
Browse files Browse the repository at this point in the history
* perf(shareReplay): use correct default paramter

* test(ReplaySubject): combination of bufferSize and windowTime
  • Loading branch information
nename0 authored and benlesh committed Sep 11, 2018
1 parent a83e498 commit 114e771
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 3 deletions.
13 changes: 12 additions & 1 deletion spec/operators/shareReplay-spec.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { hot, cold, expectObservable, expectSubscriptions } from '../helpers/marble-testing';
import { shareReplay, mergeMapTo, retry, take } from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { Observable, interval, Operator, Observer } from 'rxjs';
import { Observable, interval, Operator, Observer, of } from 'rxjs';

declare function asDiagram(arg: string): Function;
declare const rxTestScheduler: TestScheduler;
Expand Down Expand Up @@ -176,6 +177,16 @@ describe('shareReplay operator', () => {
expect(results).to.deep.equal([0, 1, 2, 4, 5, 6, 7, 8, 9]);
});

it('when no windowTime is given ReplaySubject should be in _infiniteTimeWindow mode', () => {
const spy = sinon.spy(rxTestScheduler, 'now');

of(1)
.pipe(shareReplay(1, undefined, rxTestScheduler))
.subscribe();
spy.restore();
expect(spy, 'ReplaySubject should not call scheduler.now() when no windowTime is given').to.be.not.called;
});

it('should not break lift() composability', (done: MochaDone) => {
class MyCustomObservable<T> extends Observable<T> {
lift<R>(operator: Operator<T, R>): Observable<R> {
Expand Down
16 changes: 16 additions & 0 deletions spec/subjects/ReplaySubject-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,22 @@ describe('ReplaySubject', () => {
)).toBe(sourceTemplate);
expectObservable(subscriber1).toBe(expected1);
});

it('should only replay bufferSize items when 40 time units ago more were emited', () => {
const replaySubject = new ReplaySubject<string>(2, 40, rxTestScheduler);
function feedNextIntoSubject(x: string) { replaySubject.next(x); }
function feedErrorIntoSubject(err: any) { replaySubject.error(err); }
function feedCompleteIntoSubject() { replaySubject.complete(); }

const sourceTemplate = '1234 |';
const subscriber1 = hot(' (a|)').mergeMapTo(replaySubject);
const expected1 = ' (34) |';

expectObservable(hot(sourceTemplate).do(
feedNextIntoSubject, feedErrorIntoSubject, feedCompleteIntoSubject
)).toBe(sourceTemplate);
expectObservable(subscriber1).toBe(expected1);
});
});

it('should be an Observer which can be given to Observable.subscribe', () => {
Expand Down
4 changes: 2 additions & 2 deletions src/internal/operators/shareReplay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import { Subscriber } from '../Subscriber';
* @see {@link publishReplay}
*
* @param {Number} [bufferSize=Number.POSITIVE_INFINITY] Maximum element count of the replay buffer.
* @param {Number} [windowTime=Number.MAX_VALUE] Maximum time length of the replay buffer in milliseconds.
* @param {Number} [windowTime=Number.POSITIVE_INFINITY] Maximum time length of the replay buffer in milliseconds.
* @param {Scheduler} [scheduler] Scheduler where connected observers within the selector function
* will be invoked on.
* @return {Observable} An observable sequence that contains the elements of a sequence produced
Expand All @@ -48,7 +48,7 @@ import { Subscriber } from '../Subscriber';
*/
export function shareReplay<T>(
bufferSize: number = Number.POSITIVE_INFINITY,
windowTime: number = Number.MAX_VALUE,
windowTime: number = Number.POSITIVE_INFINITY,
scheduler?: SchedulerLike
): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => source.lift(shareReplayOperator(bufferSize, windowTime, scheduler));
Expand Down

0 comments on commit 114e771

Please sign in to comment.