Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(schedulers): fix asap and animationFrame schedulers to execute ac…
Browse files Browse the repository at this point in the history
…ross their respective async bou

The AsapScheduler and AnimationFrameSchedulers were totally busted. My bad. Now they execute their
scheduled actions in batches, but if actions reschedule while executing a batch, a new frame is
requested for the rescheduled action to execute in.

ReactiveX#1814
trxcllnt committed Jul 9, 2016
1 parent 2aa1433 commit 7ca4554
Showing 25 changed files with 592 additions and 696 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@
"prepublish": "shx rm -rf ./typings && typings install && npm run build_all",
"publish_docs": "./publish_docs.sh",
"test_mocha": "mocha --opts spec/support/default.opts spec-js",
"debug_mocha": "node-debug _mocha --opts spec/support/debug.opts spec-js",
"test_browser": "npm-run-all build_spec_browser && opn spec/support/mocha-browser-runner.html",
"test": "npm-run-all clean_spec build_spec test_mocha clean_spec",
"tests2png": "npm run build_spec && mkdirp tmp/docs/img && mkdirp spec-js/support && shx cp spec/support/*.opts spec-js/support/ && mocha --opts spec/support/tests2png.opts spec-js",
2 changes: 1 addition & 1 deletion spec/support/debug.opts
Original file line number Diff line number Diff line change
@@ -8,7 +8,7 @@
--bail
--full-trace
--check-leaks
--globals WebSocket,FormData
--globals WebSocket,FormData,XDomainRequest,ActiveXObject

--recursive
--timeout 100000
88 changes: 0 additions & 88 deletions src/MiscJSDoc.ts
Original file line number Diff line number Diff line change
@@ -9,7 +9,6 @@ import {Subscriber} from './Subscriber';
import {TeardownLogic} from './Subscription';
import {Observable} from './Observable';
import {Subscription} from './Subscription';
import {Action} from './scheduler/Action';
import './scheduler/MiscJSDoc';
import './observable/dom/MiscJSDoc';

@@ -130,90 +129,3 @@ export class ObserverDoc<T> {
return void 0;
}
}

/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
* `now()` getter method.
*
* Each unit of work in a Scheduler is called an {@link Action}.
*
* ```ts
* interface Scheduler {
* now(): number;
* schedule(work, delay?, state?): Subscription;
* flush(): void;
* active: boolean;
* actions: Action[];
* scheduledId: number;
* }
* ```
*
* @interface
* @name Scheduler
* @noimport true
*/
export class SchedulerDoc {
/**
* A getter method that returns a number representing the current time
* (at the time this function was called) according to the scheduler's own
* internal clock.
* @return {number} A number that represents the current time. May or may not
* have a relation to wall-clock time. May or may not refer to a time unit
* (e.g. milliseconds).
*/
now(): number {
return 0;
}

/**
* Schedules a function, `work`, for execution. May happen at some point in
* the future, according to the `delay` parameter, if specified. May be passed
* some context object, `state`, which will be passed to the `work` function.
*
* The given arguments will be processed an stored as an Action object in a
* queue of actions.
*
* @param {function(state: ?T): ?Subscription} work A function representing a
* task, or some unit of work to be executed by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler itself.
* @param {T} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
schedule<T>(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription {
return void 0;
}

/**
* Prompt the Scheduler to execute all of its queued actions, therefore
* clearing its queue.
* @return {void}
*/
flush(): void {
return void 0;
}

/**
* A flag to indicate whether the Scheduler is currently executing a batch of
* queued actions.
* @type {boolean}
*/
active: boolean = false;

/**
* The queue of scheduled actions as an array.
* @type {Action[]}
*/
actions: Action<any>[] = [];

/**
* An internal ID used to track the latest asynchronous task such as those
* coming from `setTimeout`, `setInterval`, `requestAnimationFrame`, and
* others.
* @type {number}
*/
scheduledId: number = 0;
}
20 changes: 9 additions & 11 deletions src/Rx.ts
Original file line number Diff line number Diff line change
@@ -157,17 +157,13 @@ export {UnsubscriptionError} from './util/UnsubscriptionError';
export {TimeInterval} from './operator/timeInterval';
export {Timestamp} from './operator/timestamp';
export {TestScheduler} from './testing/TestScheduler';
export {VirtualTimeScheduler} from './scheduler/VirtualTimeScheduler';
export {VirtualTimeScheduler} from './scheduler/virtual';
export {AjaxRequest, AjaxResponse, AjaxError, AjaxTimeoutError} from './observable/dom/AjaxObservable';

import {asap} from './scheduler/asap';
import {async} from './scheduler/async';
import {queue} from './scheduler/queue';
import {animationFrame} from './scheduler/animationFrame';
import {AsapScheduler} from './scheduler/AsapScheduler';
import {AsyncScheduler} from './scheduler/AsyncScheduler';
import {QueueScheduler} from './scheduler/QueueScheduler';
import {AnimationFrameScheduler} from './scheduler/AnimationFrameScheduler';
import {AsapScheduler, asap} from './scheduler/asap';
import {AsyncScheduler, async} from './scheduler/async';
import {QueueScheduler, queue} from './scheduler/queue';
import {AnimationFrameScheduler, animationFrame} from './scheduler/animationFrame';
import {$$rxSubscriber as rxSubscriber} from './symbol/rxSubscriber';
import {$$iterator as iterator} from './symbol/iterator';
import observable from 'symbol-observable';
@@ -184,12 +180,14 @@ import observable from 'symbol-observable';
* asynchronous conversions.
* @property {Scheduler} async Schedules work with `setInterval`. Use this for
* time-based operations.
* @property {Scheduler} animation Schedules work with `requestAnimationFrame`.
* Use this for synchronizing with the platform's painting
*/
let Scheduler = {
asap,
async,
queue,
animationFrame,
async
animationFrame
};

/**
102 changes: 92 additions & 10 deletions src/Scheduler.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,93 @@
import {Subscription} from './Subscription';
import {Action} from './scheduler/Action';

export interface Scheduler {
now(): number;
schedule<T>(work: (state?: T) => Subscription | void, delay?: number, state?: T): Subscription;
flush(): void;
active: boolean;
actions: Action<any>[]; // XXX: use `any` to remove type param `T` from `Scheduler`.
scheduledId: number;
import {Subscription, TeardownLogic} from './Subscription';

/**
* An execution context and a data structure to order tasks and schedule their
* execution. Provides a notion of (potentially virtual) time, through the
* `now()` getter method.
*
* Each unit of work in a Scheduler is called an {@link Action}.
*
* ```ts
* class Scheduler extends Array {
* now(): number;
* schedule(work, delay?, state?): Subscription;
* }
* ```
*
* @class Scheduler
*/
export class Scheduler extends Array {

public static now: () => number = Date.now ? Date.now : () => +new Date();

constructor(private SchedulerAction: typeof Action,
now: () => number = Scheduler.now) {
super();
this.now = now;
}

/**
* A getter method that returns a number representing the current time
* (at the time this function was called) according to the scheduler's own
* internal clock.
* @return {number} A number that represents the current time. May or may not
* have a relation to wall-clock time. May or may not refer to a time unit
* (e.g. milliseconds).
*/
public now: () => number;

/**
* Schedules a function, `work`, for execution. May happen at some point in
* the future, according to the `delay` parameter, if specified. May be passed
* some context object, `state`, which will be passed to the `work` function.
*
* The given arguments will be processed an stored as an Action object in a
* queue of actions.
*
* @param {function(state: ?T): ?Subscription} work A function representing a
* task, or some unit of work to be executed by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler itself.
* @param {T} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @return {Subscription} A subscription in order to be able to unsubscribe
* the scheduled work.
*/
public schedule<T>(work: (state?: T) => TeardownLogic, delay: number = 0, state?: T): Subscription {
return new this.SchedulerAction<T>(this, work).schedule(state, delay);
}
}


/**
* A unit of work to be executed in a {@link Scheduler}. An action is typically
* created from within a Scheduler and an RxJS user does not need to concern
* themselves about creating and manipulating an Action.
*
* ```ts
* class Action<T> extends Subscription {
* new (work: (state?: T) => TeardownLogic, scheduler: Scheduler);
* schedule(state?: T, delay: number = 0): Subscription;
* }
* ```
*
* @class Action<T>
*/
export class Action<T> extends Subscription {
constructor(scheduler: Scheduler, work: (state?: T) => TeardownLogic) {
super();
}
/**
* Schedules this action on its parent Scheduler for execution. May be passed
* some context object, `state`. May happen at some point in the future,
* according to the `delay` parameter, if specified.
* @param {any} [state] Some contextual data that the `work` function uses when
* called by the Scheduler.
* @param {number} [delay] Time to wait before executing the work, where the
* time unit is implicit and defined by the Scheduler.
* @return {void}
*/
public schedule(state?: T, delay: number = 0): Subscription {
return this;
}
}
20 changes: 9 additions & 11 deletions src/observable/GenerateObservable.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
import {Observable} from '../Observable' ;
import {Scheduler} from '../Scheduler';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Action} from '../scheduler/Action';

import {Action, Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';

const selfSelector = <T>(value: T) => value;
@@ -70,16 +68,16 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
*
* <img src="./img/generate.png" width="100%">
*
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
* var res = Rx.Observable.generate(0, x => x < 10, x => x + 1, x => x);
*
*
* @example <caption>Using asap scheduler, produces sequence of 2, 3, 5, then completes.</caption>
* var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, x => x + 1, Rx.Scheduler.asap);
*
* @see {@link from}
* @see {@link create}
*
*
* @param {S} initialState Initial state.
* @param {function (state: S): boolean} condition Condition to terminate generation (upon returning false).
* @param {function (state: S): S} iterate Iteration step function.
@@ -98,12 +96,12 @@ export class GenerateObservable<T, S> extends Observable<T> {
* producing the sequence's elements, using the specified scheduler
* to send out observer messages.
* The overload uses state as an emitted value.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
* var res = Rx.Observable.generate(0, x => x < 10, x => x + 1);
*
*
* @example <caption>Using asap scheduler, produces sequence of 1, 2, 4, then completes.</caption>
* var res = Rx.Observable.generate(1, x => x < 5, x => x * 2, Rx.Scheduler.asap);
*
@@ -127,7 +125,7 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
* The overload accepts options object that might contain inital state, iterate,
* condition and scheduler.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
@@ -151,7 +149,7 @@ export class GenerateObservable<T, S> extends Observable<T> {
* to send out observer messages.
* The overload accepts options object that might contain inital state, iterate,
* condition, result selector and scheduler.
*
*
* <img src="./img/generate.png" width="100%">
*
* @example <caption>Produces sequence of 0, 1, 2, ... 9, then completes.</caption>
@@ -293,4 +291,4 @@ export class GenerateObservable<T, S> extends Observable<T> {
}
return (<Action<SchedulerState<T, S>>><any>this).schedule(state);
}
}
}
7 changes: 3 additions & 4 deletions src/operator/bufferTime.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
import {Operator} from '../Operator';
import {async} from '../scheduler/async';
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';
import {Observable} from '../Observable';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {async} from '../scheduler/async';
import {Action, Scheduler} from '../Scheduler';
import {isScheduler} from '../util/isScheduler';

/**
7 changes: 3 additions & 4 deletions src/operator/windowTime.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
import {Subject} from '../Subject';
import {Operator} from '../Operator';
import {async} from '../scheduler/async';
import {Subscriber} from '../Subscriber';
import {Observable} from '../Observable';
import {Subject} from '../Subject';
import {Subscription} from '../Subscription';
import {Scheduler} from '../Scheduler';
import {Action} from '../scheduler/Action';
import {async} from '../scheduler/async';
import {Action, Scheduler} from '../Scheduler';

/**
* Branch out the source Observable values as a nested Observable periodically
12 changes: 0 additions & 12 deletions src/scheduler/Action.ts

This file was deleted.

Loading

0 comments on commit 7ca4554

Please sign in to comment.