Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async dispatched config handlers #7344

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 86 additions & 90 deletions packages/rxjs/spec/operators/share-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,12 @@ import {
} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from '../helpers/observableMatcher';
import { SinonSpy, spy } from 'sinon';
import { spy } from 'sinon';

const syncNotify = of(1);
const asapNotify = scheduled(syncNotify, asapScheduler);
const syncError = throwError(() => new Error());

function spyOnUnhandledError(fn: (spy: SinonSpy) => void): void {
const prevOnUnhandledError = config.onUnhandledError;

try {
const onUnhandledError = spy();
config.onUnhandledError = onUnhandledError;

fn(onUnhandledError);
} finally {
config.onUnhandledError = prevOnUnhandledError;
}
}

/** @test {share} */
describe('share', () => {
let rxTest: TestScheduler;
Expand Down Expand Up @@ -810,9 +797,22 @@ describe('share', () => {
});
});

it('should not reset on refCount 0 if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
describe('when config.onUnhandledError is set', () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding this describe block is what I think messed with the whitespace diff in other parts of the file.

afterEach(() => {
config.onUnhandledError = null;
});

it('should not reset on refCount 0 if reset notifier errors before emitting any value', (done) => {
const error = new Error();
let calls = 0;

config.onUnhandledError = spy((err) => {
calls++;
expect(err).to.equal(error);
if (calls === 2) {
done();
}
});

rxTest.run(({ hot, cold, expectObservable, expectSubscriptions }) => {
const source = hot(' ---1---2---3---4---(5 )---|');
Expand All @@ -830,17 +830,16 @@ describe('share', () => {
expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledTwice;
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
expect(onUnhandledError.getCall(1)).to.have.been.calledWithExactly(error);
});
});

it('should not reset on error if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
it('should not reset on error if reset notifier errors before emitting any value', (done) => {
const error = new Error();

config.onUnhandledError = spy((err) => {
expect(err).to.equal(error);
done();
});

rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold(' ---1---2---# ');
const sourceSubs = ' ^----------! ';
Expand All @@ -856,89 +855,86 @@ describe('share', () => {
expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledOnce;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These tests were errant, in that they were able to check to see if onUnhandledError was dispatched synchronously, which it was never supposed to be.

expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
});
});

it('should not reset on complete if reset notifier errors before emitting any value', () => {
spyOnUnhandledError((onUnhandledError) => {
const error = new Error();

rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const source = cold(' ---1---2---| ');
const sourceSubs = ' ^----------! ';
const expected = ' ---1---2------|';
const subscription = ' ^--------------';
const firstPause = cold(' -------|');
const reset = cold(' --# ', undefined, error);

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));
it('should not reset on complete if reset notifier errors before emitting any value', (done) => {
const error = new Error();

const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
});

expect(onUnhandledError).to.have.been.calledOnce;
expect(onUnhandledError.getCall(0)).to.have.been.calledWithExactly(error);
config.onUnhandledError = spy((err) => {
expect(err).to.equal(error);
done();
});
});

it('should not call "resetOnRefCountZero" on error', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2#) ');
// source: ' ---1---(2#) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2#) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'
const source = cold(' ---1---2---| ');
const sourceSubs = ' ^----------! ';
const expected = ' ---1---2------|';
const subscription = ' ^--------------';
const firstPause = cold(' -------|');
const reset = cold(' --# ', undefined, error);

const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));
const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero: false }), take(2));

const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);
const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});
});

it('should not call "resetOnRefCountZero" on complete', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2|) ');
// source: ' ---1---(2|) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2|) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource, firstPause, sharedSource);
it('should not call "resetOnRefCountZero" on error', () => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything under this section is really a whitespace change. I'm not sure why the diff is flipping out here. This was just from reorganizing things in the file I think.

rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2#) ');
// source: ' ---1---(2#) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2#) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnError: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource.pipe(onErrorResumeNextWith(firstPause)), sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
it('should not call "resetOnRefCountZero" on complete', () => {
rxTest.run(({ cold, expectObservable, expectSubscriptions }) => {
const resetOnRefCountZero = spy(() => EMPTY);

const source = cold(' ---1---(2|) ');
// source: ' ---1---(2|) '
const sourceSubs = [
' ^------(! ) ',
// break the line, please
' -------(- )---^------(! ) ',
];
const expected = ' ---1---(2 )------1---(2|) ';
const subscription = ' ^------(- )----------(- ) ';
const firstPause = cold(' (- )---| ');
const reset = cold(' (- )-r ');
// reset: ' (- )-r'

const sharedSource = source.pipe(share({ resetOnComplete: () => reset, resetOnRefCountZero }));

const result = concat(sharedSource, firstPause, sharedSource);

expectObservable(result, subscription).toBe(expected);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
expect(resetOnRefCountZero).to.not.have.been.called;
});
});
});
3 changes: 1 addition & 2 deletions packages/rxjs/src/internal/Subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { Subscription } from './Subscription';
import { config } from './config';
import { reportUnhandledError } from './util/reportUnhandledError';
import { nextNotification, errorNotification, COMPLETE_NOTIFICATION } from './NotificationFactories';
import { timeoutProvider } from './scheduler/timeoutProvider';

export interface SubscriberOverrides<T> {
/**
Expand Down Expand Up @@ -267,7 +266,7 @@ function createSafeObserver<T>(observerOrNext?: Partial<Observer<T>> | ((value:
*/
function handleStoppedNotification(notification: ObservableNotification<any>, subscriber: Subscriber<any>) {
const { onStoppedNotification } = config;
onStoppedNotification && timeoutProvider.setTimeout(() => onStoppedNotification(notification, subscriber));
onStoppedNotification && setTimeout(() => onStoppedNotification(notification, subscriber));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One important bit is here.

}

function hasAddAndUnsubscribe(value: any): value is Subscription {
Expand Down
3 changes: 1 addition & 2 deletions packages/rxjs/src/internal/util/reportUnhandledError.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { config } from '../config';
import { timeoutProvider } from '../scheduler/timeoutProvider';

/**
* Handles an error on another job either with the user-configured {@link onUnhandledError},
Expand All @@ -11,7 +10,7 @@ import { timeoutProvider } from '../scheduler/timeoutProvider';
* @param err the error to report
*/
export function reportUnhandledError(err: any) {
timeoutProvider.setTimeout(() => {
setTimeout(() => {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other important bit is here.

const { onUnhandledError } = config;
if (onUnhandledError) {
// Execute the user-configured error handler.
Expand Down