Skip to content

Commit

Permalink
refactor: isolate reliance on SchedulerAction this context (#7322)
Browse files Browse the repository at this point in the history
This goes through and moves all of the reliance on the `this` context for scheduling to a single point, `executeSchedule`. It also moves common complexities to that helper function. All of this is done in preparation for creating newer, simplified schedulers
  • Loading branch information
benlesh authored Dec 19, 2023
1 parent 9e638ff commit e5f4140
Show file tree
Hide file tree
Showing 7 changed files with 101 additions and 100 deletions.
25 changes: 25 additions & 0 deletions packages/rxjs/spec/scheduled/scheduled-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,29 @@ describe('scheduled', () => {
done();
});
});

it('should handle scheduling a promise that unsubscribes prior to complete', (done) => {
const results: any[] = [];
const input = Promise.resolve('x'); // strings are iterables
const subscription = scheduled(input, testScheduler).subscribe({
next(value) {
results.push(value);
subscription.unsubscribe();
},
complete() { results.push('done'); },
});

expect(results).to.deep.equal([]);

// Promises force async, so we can't schedule synchronously, no matter what.
testScheduler.flush();
expect(results).to.deep.equal([]);

Promise.resolve().then(() => {
// NOW it should work, as the other promise should have resolved.
testScheduler.flush();
expect(results).to.deep.equal(['x']);
done();
});
});
});
10 changes: 7 additions & 3 deletions packages/rxjs/src/internal/observable/range.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { SchedulerLike } from '../types.js';
import { Observable } from '../Observable.js';
import { EMPTY } from './empty.js';
import { executeSchedule } from '../util/executeSchedule.js';

export function range(start: number, count?: number): Observable<number>;

Expand Down Expand Up @@ -72,14 +73,17 @@ export function range(start: number, count?: number, scheduler?: SchedulerLike):
? // The deprecated scheduled path.
(subscriber) => {
let n = start;
return scheduler.schedule(function () {
const emit = () => {
if (n < end) {
subscriber.next(n++);
this.schedule();
if (!subscriber.closed) {
executeSchedule(subscriber, scheduler, emit);
}
} else {
subscriber.complete();
}
});
};
executeSchedule(subscriber, scheduler, emit);
}
: // Standard synchronous range.
(subscriber) => {
Expand Down
25 changes: 19 additions & 6 deletions packages/rxjs/src/internal/observable/timer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { SchedulerLike } from '../types.js';
import { asyncScheduler } from '../scheduler/async.js';
import { isScheduler } from '../util/isScheduler.js';
import { isValidDate } from '../util/isDate.js';
import { executeSchedule } from '../util/executeSchedule.js';

/**
* Creates an observable that will wait for a specified time period, or exact date, before
Expand Down Expand Up @@ -167,20 +168,32 @@ export function timer(
let n = 0;

// Start the timer.
return scheduler.schedule(function () {
if (!subscriber.closed) {
// Emit the next value and increment.
return executeSchedule(
subscriber,
scheduler,
() => {
// Emit the first value and schedule the next.
subscriber.next(n++);

if (0 <= intervalDuration) {
// If we have a interval after the initial timer,
// reschedule with the period.
this.schedule(undefined, intervalDuration);
executeSchedule(
subscriber,
scheduler,
() => {
// Emit the interval values.
subscriber.next(n++);
},
intervalDuration,
true
);
} else {
// We didn't have an interval. So just complete.
subscriber.complete();
}
}
}, due);
},
due
);
});
}
67 changes: 22 additions & 45 deletions packages/rxjs/src/internal/operators/debounceTime.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import { asyncScheduler } from '../scheduler/async.js';
import type { Subscription} from '../Observable.js';
import type { Subscription } from '../Observable.js';
import { Observable, operate } from '../Observable.js';
import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '../types.js';
import type { MonoTypeOperatorFunction, SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';

/**
* Emits a notification from the source Observable only after a particular time span
Expand Down Expand Up @@ -62,64 +63,40 @@ import type { MonoTypeOperatorFunction, SchedulerAction, SchedulerLike } from '.
export function debounceTime<T>(dueTime: number, scheduler: SchedulerLike = asyncScheduler): MonoTypeOperatorFunction<T> {
return (source) =>
new Observable((destination) => {
let activeTask: Subscription | null = null;
let lastValue: T | null = null;
let lastTime: number | null = null;
let scheduling = false;

const emit = () => {
if (scheduling || activeTask) {
// We have a value! Free up memory first, then emit the value.
if (activeTask) {
activeTask.unsubscribe();
activeTask = null;
}
const value = lastValue!;
lastValue = null;
destination.next(value);
}
};
function emitWhenIdle(this: SchedulerAction<unknown>) {
// This is called `dueTime` after the first value
// but we might have received new values during this window!

const targetTime = lastTime! + dueTime;
const now = scheduler.now();
if (now < targetTime) {
// On that case, re-schedule to the new target
activeTask = this.schedule(undefined, targetTime - now);
destination.add(activeTask);
return;
}

emit();
}
let lastValue: T;
let activeTask: Subscription | void;

source.subscribe(
operate({
destination,
next: (value: T) => {
lastValue = value;
lastTime = scheduler.now();
// Clear any pending task and schedule a new one.
activeTask?.unsubscribe();

// Only set up a task if it's not already up
if (!activeTask) {
scheduling = true;
activeTask = scheduler.schedule(emitWhenIdle, dueTime);
scheduling = false;
// Set activeTask as intermediary Subscription to handle synchronous schedulers
destination.add(activeTask);
}
activeTask = executeSchedule(
destination,
scheduler,
() => {
activeTask = undefined;
const v = lastValue;
lastValue = null!;
destination.next(v);
},
dueTime
);
},
complete: () => {
// Source completed.
// Emit any pending debounced values then complete
emit();
if (activeTask) {
destination.next(lastValue);
}
destination.complete();
},
finalize: () => {
// Finalization.
lastValue = activeTask = null;
lastValue = activeTask = null!;
},
})
);
Expand Down
6 changes: 2 additions & 4 deletions packages/rxjs/src/internal/operators/timeout.ts
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ export function timeout<T, O extends ObservableInput<any>, M>(
let originalSourceSubscription: Subscription;
// The subscription for our timeout timer. This changes
// every time we get a new value.
let timerSubscription: Subscription;
let timerSubscription: Subscription | void;
// A bit of state we pass to our with and error factories to
// tell what the last value we saw was.
let lastValue: T | null = null;
Expand Down Expand Up @@ -353,9 +353,7 @@ export function timeout<T, O extends ObservableInput<any>, M>(
each! > 0 && startTimer(each!);
},
finalize: () => {
if (!timerSubscription?.closed) {
timerSubscription?.unsubscribe();
}
timerSubscription?.unsubscribe();
// Be sure not to hold the last value in memory after unsubscription
// it could be quite large.
lastValue = null;
Expand Down
21 changes: 9 additions & 12 deletions packages/rxjs/src/internal/scheduled/scheduleArray.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,24 @@
import { Observable } from '../Observable.js';
import type { SchedulerLike } from '../types.js';
import { executeSchedule } from '../util/executeSchedule.js';

export function scheduleArray<T>(input: ArrayLike<T>, scheduler: SchedulerLike) {
return new Observable<T>((subscriber) => {
// The current array index.
let i = 0;
// Start iterating over the array like on a schedule.
return scheduler.schedule(function () {
const emit = () => {
// If we have hit the end of the array, complete.
if (i === input.length) {
// If we have hit the end of the array like in the
// previous job, we can complete.
subscriber.complete();
} else {
// Otherwise let's next the value at the current index,
// Otherwise, next the value at the current index,
// then increment our index.
subscriber.next(input[i++]);
// If the last emission didn't cause us to close the subscriber
// (via take or some side effect), reschedule the job and we'll
// make another pass.
if (!subscriber.closed) {
this.schedule();
}
executeSchedule(subscriber, scheduler, emit);
}
});
};

// Start iterating over the array like on a schedule.
return executeSchedule(subscriber, scheduler, emit);
});
}
47 changes: 17 additions & 30 deletions packages/rxjs/src/internal/util/executeSchedule.ts
Original file line number Diff line number Diff line change
@@ -1,44 +1,31 @@
import type { Subscription } from '../Observable.js';
import type { SchedulerAction, SchedulerLike } from '../types.js';

export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay: number,
repeat: true
): void;
export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay?: number,
repeat?: false
): Subscription;

export function executeSchedule(
parentSubscription: Subscription,
scheduler: SchedulerLike,
work: () => void,
delay = 0,
repeat = false
): Subscription | void {
const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction<any>) {
work();
if (repeat) {
parentSubscription.add(this.schedule(null, delay));
} else {
this.unsubscribe();
}
}, delay);
if (!parentSubscription.closed) {
const scheduleSubscription = scheduler.schedule(function (this: SchedulerAction<any>) {
work();
if (repeat) {
parentSubscription.add(this.schedule(null, delay));
} else {
this.unsubscribe();
}
}, delay);

parentSubscription.add(scheduleSubscription);
parentSubscription.add(scheduleSubscription);

if (!repeat) {
// Because user-land scheduler implementations are unlikely to properly reuse
// Actions for repeat scheduling, we can't trust that the returned subscription
// will control repeat subscription scenarios. So we're trying to avoid using them
// incorrectly within this library.
return scheduleSubscription;
if (!repeat) {
// Because user-land scheduler implementations are unlikely to properly reuse
// Actions for repeat scheduling, we can't trust that the returned subscription
// will control repeat subscription scenarios. So we're trying to avoid using them
// incorrectly within this library.
return scheduleSubscription;
}
}
}

0 comments on commit e5f4140

Please sign in to comment.