diff --git a/packages/core/src/operators/dedupe.ts b/packages/core/src/operators/dedupe.ts index 24f07208..2b6ade48 100644 --- a/packages/core/src/operators/dedupe.ts +++ b/packages/core/src/operators/dedupe.ts @@ -34,26 +34,25 @@ export function withDedupe { - // If upstream observable is synchronous, `_multicast` will skip the first - // value (because its observers will not yet have been registered when - // upstream dispatches). Thus, we manually multicast here. - const lastValueByChannel = new Map, T>(); - - return this._nextOperator( - (value: T, dispatch: NextChannel) => { - const lastValue = lastValueByChannel.get(dispatch); - - if (lastValueByChannel.has(dispatch) && areEqual(value, lastValue)) { - return; + return this._nextOperator({ + operation({ emit }) { + let emitted = false; + let lastValue: T; + + return ({ upstream }) => { + if (emitted && areEqual(upstream, lastValue)) { + return; + } + + // To prevent a potential infinite loop, these flags must be set + // before dispatching the result to the observer. + lastValue = upstream; + emitted = true; + + emit(upstream); } - - // To prevent a potential infinite loop, this must be set before - // dispatching the result to the observer - lastValueByChannel.set(dispatch, value); - - dispatch(value); } - ); + }); } }; } diff --git a/packages/core/src/operators/foundation/__tests__/_nextOperator.test.ts b/packages/core/src/operators/foundation/__tests__/_nextOperator.test.ts index 74ec79f3..73acc9c3 100644 --- a/packages/core/src/operators/foundation/__tests__/_nextOperator.test.ts +++ b/packages/core/src/operators/foundation/__tests__/_nextOperator.test.ts @@ -29,23 +29,17 @@ import { } from 'sinon'; import { - createMockObserver, -} from 'material-motion-testing-utils'; - -import { - MotionObservable, + MotionSubject, } from '../../../observables/'; describe('motionObservable._nextOperator', () => { - let stream; - let mockObserver; + let subject; let nextListener; beforeEach( () => { - mockObserver = createMockObserver(); - stream = new MotionObservable(mockObserver.connect); + subject = new MotionSubject(); nextListener = stub(); } @@ -55,17 +49,17 @@ describe('motionObservable._nextOperator', () => { const waitAMoment = Promise.resolve(); - const makeValuesAsync = (value, nextChannel) => { - waitAMoment.then( - () => { - nextChannel(value); - } - ); - }; - - stream._nextOperator(makeValuesAsync).subscribe(nextListener); + subject._nextOperator({ + operation: ({ emit }) => ({ upstream }) => { + waitAMoment.then( + () => { + emit(upstream); + } + ); + } + }).subscribe(nextListener); - mockObserver.next(1); + subject.next(1); expect(nextListener).not.to.have.been.called; return waitAMoment.then( @@ -86,13 +80,13 @@ describe('motionObservable._nextOperator', this.listener(value); } - stream._nextOperator( - (value, nextChannel) => nextChannel(value) - ).subscribe({ + subject._nextOperator({ + operation: ({ emit }) => ({ upstream }) => emit(upstream) + }).subscribe({ next: callOwnListener.bind(dictWithListener) }); - mockObserver.next(1); + subject.next(1); expect(nextListener).to.have.been.calledWith(1); } diff --git a/packages/core/src/operators/foundation/_filter.ts b/packages/core/src/operators/foundation/_filter.ts index 0790e202..b4ae52cb 100644 --- a/packages/core/src/operators/foundation/_filter.ts +++ b/packages/core/src/operators/foundation/_filter.ts @@ -33,13 +33,13 @@ export function withFilter>>(supe * values that return `true` to the observer. */ _filter(predicate: Predicate): ObservableWithMotionOperators { - return this._nextOperator( - (value: T, dispatch: NextChannel) => { - if (predicate(value)) { - dispatch(value); + return this._nextOperator({ + operation: ({ emit }) => ({ upstream }) => { + if (predicate(upstream)) { + emit(upstream); } } - ); + }); } }; } diff --git a/packages/core/src/operators/foundation/_flattenIterables.ts b/packages/core/src/operators/foundation/_flattenIterables.ts index 8f7d7a67..8d75cef7 100644 --- a/packages/core/src/operators/foundation/_flattenIterables.ts +++ b/packages/core/src/operators/foundation/_flattenIterables.ts @@ -37,14 +37,14 @@ export function withFlattenIterables(): ObservableWithMotionOperators { - return this._nextOperator( - (values: T | Iterable, dispatch: NextChannel) => { + return this._nextOperator({ + operation: ({ emit }) => ({ upstream: values }) => { if (isIterable(values)) { for (const value of values) { - dispatch(value); + emit(value); } } else { - dispatch(values as any as U); + emit(values as any as U); } } ); diff --git a/packages/core/src/operators/foundation/_map.ts b/packages/core/src/operators/foundation/_map.ts index 805eba44..e4cc54fc 100644 --- a/packages/core/src/operators/foundation/_map.ts +++ b/packages/core/src/operators/foundation/_map.ts @@ -32,9 +32,9 @@ export function withMap>>(supercl * result to the observer. */ _map(transform: (value: T) => U): ObservableWithMotionOperators { - return this._nextOperator( - (value: T, dispatch: NextChannel) => { - dispatch(transform(value)); + return this._nextOperator({ + operation: ({ emit }) => ({ upstream }) => { + emit(transform(upstream)); } ); } diff --git a/packages/core/src/operators/foundation/_nextOperator.ts b/packages/core/src/operators/foundation/_nextOperator.ts index 23bbd566..50afca94 100644 --- a/packages/core/src/operators/foundation/_nextOperator.ts +++ b/packages/core/src/operators/foundation/_nextOperator.ts @@ -20,15 +20,19 @@ import { import { Constructor, + EmittingOperation, NextChannel, - NextOperation, Observable, ObservableWithMotionOperators, Observer, } from '../../types'; +export type _NextOperatorArgs = { + operation: EmittingOperation<{}, T, U>, +} + export interface MotionNextOperable extends Observable { - _nextOperator(operation: NextOperation): ObservableWithMotionOperators; + _nextOperator(kwargs: _NextOperatorArgs): ObservableWithMotionOperators; } export function withNextOperator>>(superclass: S): S & Constructor> { @@ -42,13 +46,15 @@ export function withNextOperator>>(superc * `next` channel, transform it, and use the supplied callback to dispatch * the result to the observer's `next` channel. */ - _nextOperator(operation: NextOperation): ObservableWithMotionOperators { + _nextOperator({ operation }: _NextOperatorArgs): ObservableWithMotionOperators { return new MotionObservable( (observer: Observer) => { - const dispatch: NextChannel = observer.next.bind(observer); + const innerOperation = operation({ + emit: observer.next.bind(observer), + }); const subscription = this.subscribe( - (value: T) => operation(value, dispatch) + (value: T) => innerOperation({ upstream: value }) ); return subscription.unsubscribe; diff --git a/packages/core/src/operators/foundation/_remember.ts b/packages/core/src/operators/foundation/_remember.ts index eb937bca..a4de1f60 100644 --- a/packages/core/src/operators/foundation/_remember.ts +++ b/packages/core/src/operators/foundation/_remember.ts @@ -16,7 +16,6 @@ import { Constructor, - NextOperation, Observable, ObservableWithMotionOperators, Observer, diff --git a/packages/core/src/operators/foundation/_slidingWindow.ts b/packages/core/src/operators/foundation/_slidingWindow.ts index f362fd18..82e9ae1a 100644 --- a/packages/core/src/operators/foundation/_slidingWindow.ts +++ b/packages/core/src/operators/foundation/_slidingWindow.ts @@ -37,19 +37,21 @@ export function withSlidingWindow * dispatches, make a copy of each as you receive it. */ _slidingWindow(length: number = 2): ObservableWithMotionOperators> { - const result: Array = []; + return this._nextOperator({ + operation({ emit }) { + const result: Array = []; - return this._nextOperator( - (value: T, dispatch: NextChannel>) => { - result.push(value); + return ({ upstream }) => { + result.push(upstream); - if (result.length > length) { - result.shift(); - } + if (result.length > length) { + result.shift(); + } - dispatch(result); + emit(result); + } } - ); + }); } }; } diff --git a/packages/core/src/operators/foundation/_tap.ts b/packages/core/src/operators/foundation/_tap.ts index 0b4ddd13..d06b40ff 100644 --- a/packages/core/src/operators/foundation/_tap.ts +++ b/packages/core/src/operators/foundation/_tap.ts @@ -22,23 +22,23 @@ import { } from '../../types'; export interface MotionTappable { - _tap(operation: (value: T) => any): ObservableWithMotionOperators; + _tap(sideEffect: (value: T) => any): ObservableWithMotionOperators; } export function withTap>>(superclass: S): S & Constructor> { return class extends superclass implements MotionTappable { /** - * Calls `operation` for each value in the stream. The result of - * `operation` is ignored - values received from upstream are passed-through - * to the observer. + * Calls `sideEffect` for each value in the stream. The result of + * `sideEffect` is ignored - values received from upstream are passed- + * through to the observer. */ - _tap(operation: (value: T) => any): ObservableWithMotionOperators { - return this._nextOperator( - (value: T, dispatch: NextChannel) => { - operation(value); - dispatch(value); + _tap(sideEffect: (value: T) => any): ObservableWithMotionOperators { + return this._nextOperator({ + operation: ({ emit }) => ({ upstream }) => { + sideEffect(upstream); + emit(upstream); } - ); + }); } }; } diff --git a/packages/core/src/operators/inverted.ts b/packages/core/src/operators/inverted.ts index 81461fef..09583f2a 100644 --- a/packages/core/src/operators/inverted.ts +++ b/packages/core/src/operators/inverted.ts @@ -41,11 +41,11 @@ export function withInverted>>(su * - `1 - value` when it receives a numeric value */ inverted(): ObservableWithMotionOperators { - return (this as any as MotionNextOperable)._nextOperator( - (value: U, dispatch: NextChannel) => { - dispatch((1 - (value as number)) as U); + return (this as any as MotionNextOperable)._nextOperator({ + operation: ({ emit }) => ({ upstream }) => { + emit((1 - (upstream as number)) as U); } - ); + }); } }; } diff --git a/packages/core/src/types.ts b/packages/core/src/types.ts index 04b3d844..b8f25817 100644 --- a/packages/core/src/types.ts +++ b/packages/core/src/types.ts @@ -93,8 +93,6 @@ export interface Subject extends Observable { export type EmittingOperation = (kwargs: { emit: NextChannel }) => (values: D & { upstream: T }) => void; -export type NextOperation = (value: T, nextChannel: NextChannel) => void; - export type Point2D = { x: number, y: number,