Skip to content

Commit

Permalink
[refactored] _nextOperator to use named args
Browse files Browse the repository at this point in the history
Summary: A continuation of #193

Reviewers: O2 Material Motion, O3 Material JavaScript platform reviewers, #material_motion, featherless

Reviewed By: O2 Material Motion, #material_motion, featherless

Tags: #material_motion

Differential Revision: http://codereview.cc/D3398
  • Loading branch information
appsforartists committed Oct 11, 2017
1 parent bc9cb9f commit 8914c14
Show file tree
Hide file tree
Showing 11 changed files with 82 additions and 84 deletions.
35 changes: 17 additions & 18 deletions packages/core/src/operators/dedupe.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,26 +34,25 @@ export function withDedupe<T, S extends Constructor<ObservableWithFoundationalMo
* Ensures that every value dispatched is different than the previous one.
*/
dedupe(areEqual: EqualityCheck = deepEqual): ObservableWithMotionOperators<T> {
// If upstream observable is synchronous, `_multicast` will skip the first
// value (because its observers will not yet have been registered when
// upstream dispatches). Thus, we manually multicast here.
const lastValueByChannel = new Map<NextChannel<T>, T>();

return this._nextOperator(
(value: T, dispatch: NextChannel<T>) => {
const lastValue = lastValueByChannel.get(dispatch);

if (lastValueByChannel.has(dispatch) && areEqual(value, lastValue)) {
return;
return this._nextOperator({
operation({ emit }) {
let emitted = false;
let lastValue: T;

return ({ upstream }) => {
if (emitted && areEqual(upstream, lastValue)) {
return;
}

// To prevent a potential infinite loop, these flags must be set
// before dispatching the result to the observer.
lastValue = upstream;
emitted = true;

emit(upstream);
}

// To prevent a potential infinite loop, this must be set before
// dispatching the result to the observer
lastValueByChannel.set(dispatch, value);

dispatch(value);
}
);
});
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,17 @@ import {
} from 'sinon';

import {
createMockObserver,
} from 'material-motion-testing-utils';

import {
MotionObservable,
MotionSubject,
} from '../../../observables/';

describe('motionObservable._nextOperator',
() => {
let stream;
let mockObserver;
let subject;
let nextListener;

beforeEach(
() => {
mockObserver = createMockObserver();
stream = new MotionObservable(mockObserver.connect);
subject = new MotionSubject();

nextListener = stub();
}
Expand All @@ -55,17 +49,17 @@ describe('motionObservable._nextOperator',
() => {
const waitAMoment = Promise.resolve();

const makeValuesAsync = (value, nextChannel) => {
waitAMoment.then(
() => {
nextChannel(value);
}
);
};

stream._nextOperator(makeValuesAsync).subscribe(nextListener);
subject._nextOperator({
operation: ({ emit }) => ({ upstream }) => {
waitAMoment.then(
() => {
emit(upstream);
}
);
}
}).subscribe(nextListener);

mockObserver.next(1);
subject.next(1);

expect(nextListener).not.to.have.been.called;
return waitAMoment.then(
Expand All @@ -86,13 +80,13 @@ describe('motionObservable._nextOperator',
this.listener(value);
}

stream._nextOperator(
(value, nextChannel) => nextChannel(value)
).subscribe({
subject._nextOperator({
operation: ({ emit }) => ({ upstream }) => emit(upstream)
}).subscribe({
next: callOwnListener.bind(dictWithListener)
});

mockObserver.next(1);
subject.next(1);

expect(nextListener).to.have.been.calledWith(1);
}
Expand Down
10 changes: 5 additions & 5 deletions packages/core/src/operators/foundation/_filter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,13 @@ export function withFilter<T, S extends Constructor<MotionNextOperable<T>>>(supe
* values that return `true` to the observer.
*/
_filter(predicate: Predicate<T>): ObservableWithMotionOperators<T> {
return this._nextOperator(
(value: T, dispatch: NextChannel<T>) => {
if (predicate(value)) {
dispatch(value);
return this._nextOperator({
operation: ({ emit }) => ({ upstream }) => {
if (predicate(upstream)) {
emit(upstream);
}
}
);
});
}
};
}
8 changes: 4 additions & 4 deletions packages/core/src/operators/foundation/_flattenIterables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ export function withFlattenIterables<T, S extends Constructor<MotionNextOperable
* individually.
*/
_flattenIterables<U>(): ObservableWithMotionOperators<U> {
return this._nextOperator(
(values: T | Iterable<U>, dispatch: NextChannel<U>) => {
return this._nextOperator({
operation: ({ emit }) => ({ upstream: values }) => {
if (isIterable(values)) {
for (const value of values) {
dispatch(value);
emit(value);
}
} else {
dispatch(values as any as U);
emit(values as any as U);
}
}
);
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/operators/foundation/_map.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ export function withMap<T, S extends Constructor<MotionNextOperable<T>>>(supercl
* result to the observer.
*/
_map<U>(transform: (value: T) => U): ObservableWithMotionOperators<U> {
return this._nextOperator(
(value: T, dispatch: NextChannel<U>) => {
dispatch(transform(value));
return this._nextOperator({
operation: ({ emit }) => ({ upstream }) => {
emit(transform(upstream));
}
);
}
Expand Down
16 changes: 11 additions & 5 deletions packages/core/src/operators/foundation/_nextOperator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@ import {

import {
Constructor,
EmittingOperation,
NextChannel,
NextOperation,
Observable,
ObservableWithMotionOperators,
Observer,
} from '../../types';

export type _NextOperatorArgs<T, U> = {
operation: EmittingOperation<{}, T, U>,
}

export interface MotionNextOperable<T> extends Observable<T> {
_nextOperator<U>(operation: NextOperation<T, U>): ObservableWithMotionOperators<U>;
_nextOperator<U>(kwargs: _NextOperatorArgs<T, U>): ObservableWithMotionOperators<U>;
}

export function withNextOperator<T, S extends Constructor<Observable<T>>>(superclass: S): S & Constructor<MotionNextOperable<T>> {
Expand All @@ -42,13 +46,15 @@ export function withNextOperator<T, S extends Constructor<Observable<T>>>(superc
* `next` channel, transform it, and use the supplied callback to dispatch
* the result to the observer's `next` channel.
*/
_nextOperator<U>(operation: NextOperation<T, U>): ObservableWithMotionOperators<U> {
_nextOperator<U>({ operation }: _NextOperatorArgs<T, U>): ObservableWithMotionOperators<U> {
return new MotionObservable(
(observer: Observer<U>) => {
const dispatch: NextChannel<U> = observer.next.bind(observer);
const innerOperation = operation({
emit: observer.next.bind(observer),
});

const subscription = this.subscribe(
(value: T) => operation(value, dispatch)
(value: T) => innerOperation({ upstream: value })
);

return subscription.unsubscribe;
Expand Down
1 change: 0 additions & 1 deletion packages/core/src/operators/foundation/_remember.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

import {
Constructor,
NextOperation,
Observable,
ObservableWithMotionOperators,
Observer,
Expand Down
20 changes: 11 additions & 9 deletions packages/core/src/operators/foundation/_slidingWindow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,21 @@ export function withSlidingWindow<T, S extends Constructor<MotionNextOperable<T>
* dispatches, make a copy of each as you receive it.
*/
_slidingWindow(length: number = 2): ObservableWithMotionOperators<Array<T>> {
const result: Array<T> = [];
return this._nextOperator({
operation({ emit }) {
const result: Array<T> = [];

return this._nextOperator(
(value: T, dispatch: NextChannel<Array<T>>) => {
result.push(value);
return ({ upstream }) => {
result.push(upstream);

if (result.length > length) {
result.shift();
}
if (result.length > length) {
result.shift();
}

dispatch(result);
emit(result);
}
}
);
});
}
};
}
20 changes: 10 additions & 10 deletions packages/core/src/operators/foundation/_tap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ import {
} from '../../types';

export interface MotionTappable<T> {
_tap(operation: (value: T) => any): ObservableWithMotionOperators<T>;
_tap(sideEffect: (value: T) => any): ObservableWithMotionOperators<T>;
}

export function withTap<T, S extends Constructor<MotionNextOperable<T>>>(superclass: S): S & Constructor<MotionTappable<T>> {
return class extends superclass implements MotionTappable<T> {
/**
* Calls `operation` for each value in the stream. The result of
* `operation` is ignored - values received from upstream are passed-through
* to the observer.
* Calls `sideEffect` for each value in the stream. The result of
* `sideEffect` is ignored - values received from upstream are passed-
* through to the observer.
*/
_tap(operation: (value: T) => any): ObservableWithMotionOperators<T> {
return this._nextOperator(
(value: T, dispatch: NextChannel<T>) => {
operation(value);
dispatch(value);
_tap(sideEffect: (value: T) => any): ObservableWithMotionOperators<T> {
return this._nextOperator({
operation: ({ emit }) => ({ upstream }) => {
sideEffect(upstream);
emit(upstream);
}
);
});
}
};
}
8 changes: 4 additions & 4 deletions packages/core/src/operators/inverted.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ export function withInverted<T, S extends Constructor<MotionNextOperable<T>>>(su
* - `1 - value` when it receives a numeric value
*/
inverted<U extends T & number>(): ObservableWithMotionOperators<U> {
return (this as any as MotionNextOperable<U>)._nextOperator(
(value: U, dispatch: NextChannel<U>) => {
dispatch((1 - (value as number)) as U);
return (this as any as MotionNextOperable<U>)._nextOperator({
operation: ({ emit }) => ({ upstream }) => {
emit((1 - (upstream as number)) as U);
}
);
});
}
};
}
2 changes: 0 additions & 2 deletions packages/core/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ export interface Subject<T> extends Observable<T> {

export type EmittingOperation<D, T, U> = (kwargs: { emit: NextChannel<U> }) => (values: D & { upstream: T }) => void;

export type NextOperation<T, U> = (value: T, nextChannel: NextChannel<U>) => void;

export type Point2D = {
x: number,
y: number,
Expand Down

0 comments on commit 8914c14

Please sign in to comment.